6. Compilers Example

In this example we will create a unified_planning problem and then show how to use a compiler on it to obtain a new equivalent problem; then, we will get a plan for the compiled problem and translate it into the equivalent plan for the original problem.

Open In GitHub Open In Colab

6.1. Setup

We start by installing the library with PIP

[1]:
!apt install graphviz graphviz-dev
%pip install unified-planning[tamer,plot]

6.2. Define the UP Problem

For this example we will create a problem with the following features: - default temperature is cold - 2 jobs that can be done only if it is warm - 2 heaters with some quirks:
1) If both heaters are switched on at the same time, it will cause an electrical failure.
2) Once an heater is switched on, the heat it provides can be used only for one job, after that the heater will not provide heat anymore.
3) Every heater can be switched on only once.

In the end we want every job done, no heaters switched on and no electrical failure.

[2]:
from unified_planning.shortcuts import *

# Define User Types
Heater = UserType("Heater")
Job = UserType("Job")
Clean = UserType("Clean", Job)
Work = UserType("Work", Job)

# Define fluents
is_cold = Fluent("is_cold", BoolType()) # BoolType is the default, so it can be avoided
is_warm = Fluent("is_warm")
electrical_failure = Fluent("electrical_failure")
job_done = Fluent("job_done", BoolType(), job = Job)
is_on = Fluent("is_on", BoolType(), heater = Heater)
used_heater = Fluent("used_heater", BoolType(), heater = Heater)

# Define actions
switch_heater_on = InstantaneousAction("switch_heater_on", heater = Heater)
heater = switch_heater_on.parameter("heater")
switch_heater_on.add_precondition(Not(used_heater(heater))) # The heater must not have been already used
switch_heater_on.add_precondition(Not(is_on(heater)))       # The heater must not be already on
switch_heater_on.add_effect(is_warm, True)                  # The temperature becomes warm
switch_heater_on.add_effect(is_on(heater), True)            # The heater switches on
# Define a Variable of type "Heater", used for the existential condition
h_var = Variable("h_var", Heater)
# If exists an heater that is already on, we have an electrical failure
switch_heater_on.add_effect(electrical_failure, True, Exists(is_on(h_var), h_var))

switch_heater_off = InstantaneousAction("switch_heater_off", heater = Heater)
heater = switch_heater_off.parameter("heater")
switch_heater_off.add_precondition(is_on(heater))       # The heater must be on
switch_heater_off.add_effect(is_warm, False)            # It is not warm anymore
switch_heater_off.add_effect(is_cold, True)             # It becomes cold
switch_heater_off.add_effect(is_on(heater), False)      # The heater turns off
switch_heater_off.add_effect(used_heater(heater), True) # The heater becomes used

perform_job = InstantaneousAction("perform_job", job = Job)
job = perform_job.parameter("job")
perform_job.add_precondition(is_warm)       # Must be warm to do the job
perform_job.add_effect(is_warm, False)      # It is not warm anymore
perform_job.add_effect(is_cold, True)       # It becomes cold again
perform_job.add_effect(job_done(job), True) # The job is done

# define objects
heater_1 = Object("heater_1", Heater)
heater_2 = Object("heater_2", Heater)

clean = Object("clean", Clean)
work = Object("work", Work)

# define the problem
original_problem = Problem("heaters_and_jobs")
# add the fluents to the problem
original_problem.add_fluent(is_cold, default_initial_value = True)
original_problem.add_fluent(is_warm, default_initial_value = False)
original_problem.add_fluent(electrical_failure, default_initial_value = False)
original_problem.add_fluent(job_done, default_initial_value = False)
original_problem.add_fluent(is_on, default_initial_value = False)
original_problem.add_fluent(used_heater, default_initial_value = False)
# add objects and actions
original_problem.add_objects([heater_1, heater_2, clean, work])
original_problem.add_actions([switch_heater_on, switch_heater_off, perform_job])

# define the problem goals
original_problem.add_goal(Not(electrical_failure))          # No electrical failure
j_var = Variable("j_var", Job)
original_problem.add_goal(Forall(job_done(j_var), j_var))   # All jobs are done
original_problem.add_goal(Forall(Not(is_on(h_var)), h_var)) # All heaters are switched off

original_problem_kind = original_problem.kind

6.3. Testing Compilers

To show the usage and the capabilities of the compilers, we will take the problem we just defined and pyperplan, a planner that does not support some features of the original problem.

With the use of compilers, pyperplan will be able to solve the equivalent compiled problem and then we will rewrite the plan back to be a plan of the original problem.

6.3.1. Get pyperplan solver

We will now get the pyperplan engine and show that the original problem is not supported.

[3]:
with OneshotPlanner(name = "pyperplan") as planner:
    assert not planner.supports(original_problem_kind)
NOTE: To disable printing of planning engine credits, add this line to your code: `up.shortcuts.get_environment().credits_stream = None`
  *** Credits ***
  * In operation mode `OneshotPlanner` at line 1 of `/tmp/ipykernel_155649/2508516188.py`, you are using the following planning engine:
  * Engine name: pyperplan
  * Developers:  Albert-Ludwigs-Universität Freiburg (Yusra Alkhazraji, Matthias Frorath, Markus Grützner, Malte Helmert, Thomas Liebetraut, Robert Mattmüller, Manuela Ortlieb, Jendrik Seipp, Tobias Springenberg, Philip Stahl, Jan Wülfing)
  * Description: Pyperplan is a lightweight STRIPS planner written in Python.


6.4. First compilation kind: Quantifiers Removing

The compilation kind QUANTIFIERS_REMOVING takes a problem that might have a quantifier operand, thus an Exists or a Forall, and returns an equivalent problem that does not contain any Exists or Forall.

The quantifiers in the problem can be found in: - an action’s condition (or precondition) - the condition of an action’s conditional effect - the condition of a timed conditional effect - a goal or a timed goal - the assignment value of an effect

6.4.1. Example

In the unified_planning we have a compiler that implements this specific compilation kind: the unified_planning.engines.compilers.QuantifiersRemover. It works by taking every expression in the problem and grounding the quantifiers (Exists and Forall) with the equivalent formula in the problem.

For example, in our problem, the goal Forall(job_done(j_var), j_var) is equivalent to job_done(clean) and job_done(work), because there are only 2 possible values that the j_var variable can have. It is interesting to note how the j_var variable, of type Job, is instantiated to 2 objects of type Clean and Work, due to the typing inheritance.

Here we have an example on how to get from the factory and use a compiler to remove quantifiers:

[4]:
from unified_planning.engines import CompilationKind
# The CompilationKind class is defined in the unified_planning/engines/mixins/compiler.py file

# To get the Compiler from the factory we can use the Compiler operation mode.
# It takes a problem_kind and a compilation_kind, and returns a compiler with the capabilities we need
with Compiler(
        problem_kind = original_problem_kind,
        compilation_kind = CompilationKind.QUANTIFIERS_REMOVING
    ) as quantifiers_remover:
    # After we have the compiler, we get the compilation result
    qr_result = quantifiers_remover.compile(
        original_problem,
        CompilationKind.QUANTIFIERS_REMOVING
    )
    qr_problem = qr_result.problem
    qr_kind = qr_problem.kind

    # Check the result of the compilation
    assert original_problem_kind.has_existential_conditions() and original_problem_kind.has_universal_conditions()
    assert not qr_kind.has_existential_conditions() and not qr_kind.has_universal_conditions()

6.5. Conditional Effects Removing

The conditional effects removing compilation kind takes a problem with conditional effects and returns an equivalent problem without any conditional effect.

The conditional effects can be found in: - action’s effects - timed effects

6.5.1. Example

In the unified_planning we have a compiler that implements this specific compilation kind: the unified_planning.engines.compilers.ConditionalEffectsRemover.

It works by taking every action with at least one conditional effect and creating 2^N actions, where N is the number of conditional effects in the action. For each conditional effect, we have 2 variants: - one where the condition of the conditional effect becomes an action precondition and the effect of the conditional effects becomes a normal effect of the action - one where the inverse of the condition of the conditional effect becomes an action precondition.

All the possible combinations of those 2 variants of conditional effects are added, and then impossible or empty actions are removed.

Here we have an example on how to get from the factory and use a compiler to remove conditional effects:

[5]:
# Get the compiler from the factory
with Compiler(
        problem_kind = qr_kind,
        compilation_kind = CompilationKind.CONDITIONAL_EFFECTS_REMOVING
    ) as conditional_effects_remover:
    # After we have the compiler, we get the compilation result
    cer_result = conditional_effects_remover.compile(
        qr_problem,
        CompilationKind.CONDITIONAL_EFFECTS_REMOVING
    )
    cer_problem = cer_result.problem
    cer_kind = cer_problem.kind

    # Check the result of the compilation
    assert original_problem_kind.has_conditional_effects()
    assert qr_kind.has_conditional_effects()
    assert not cer_kind.has_conditional_effects()

6.6. Disjunctive Conditions Removing

The disjunctive conditions removing compilation kind takes a problem that might have a complex expressions as an action condition or in the goals and returns an equivalent problem where every condition becomes only a conjunction of terms (an And of terms). Where each term can be a fluent or the negation of a fluent and not a complex expression.

The complex conditions can be found in: - action’s conditions (or preconditions) - the condition of an action’s conditional effect - the condition of a timed conditional effect - problem’s goals - problem’s timed goals

6.6.1. Example

In the unified_planning we have a compiler that implements this specific compilation kind: the unified_planning.engines.compilers.DisjunctiveConditionsRemover. It modifies all the actions by making a unique And containing all the action’s preconditions, computes the equivalent formula of the original And as a disjunction of conjunctions (an Or of Ands), and then creates an action for every element of the resulting Or. Each resulting action has the same effects of the original action and one element of the Or as a precondition.

A similar pattern is used to remove disjunctions from the problem goals, but it involves adding a fresh fluent and, for every element of the resulting Or, a fresh action.

Here we have an example on how to get from the factory and use a compiler to remove disjunctive conditions:

[6]:
# Get the compiler from the factory
with Compiler(
        problem_kind = cer_kind,
        compilation_kind = CompilationKind.DISJUNCTIVE_CONDITIONS_REMOVING
    ) as disjunctive_conditions_remover:
    # After we have the compiler, we get the compilation result
    dcr_result = disjunctive_conditions_remover.compile(
        cer_problem,
        CompilationKind.DISJUNCTIVE_CONDITIONS_REMOVING
    )
    dcr_problem = dcr_result.problem
    dcr_kind = dcr_problem.kind

    # Check the result of the compilation
    assert qr_kind.has_disjunctive_conditions()
    assert cer_kind.has_disjunctive_conditions()
    assert not dcr_kind.has_disjunctive_conditions()

6.7. Negative Conditions Removing

The negative conditions removing compilation kind takes a problem that has the Not operand in any problem condition and returns an equivalent problem where the Not doesn’t appear in the problem’s conditions.

The Not operand must be removed from: - the action’s conditions (or preconditions) - the condition of every action’s conditional effects - the condition of every timed conditional effects - problem’s goals - problem’s timed goals

6.7.1. Example

In the unified_planning we have a compiler that implements this specific compilation kind: the unified_planning.engines.compilers.DisjunctiveConditionsRemover. The compiling process followed in the UP implementation is the following: - For every fluent that appears negated in the conditions or in the goals, a new fluent that represents the same fluent negated is created. - Every time the original fluent appears negated in an expression, it is replaced with the new fluent. - Every time the original fluent appears in the effect of an action, an effect is added to that action. The added effect assigns to the new fluent the negated value assigned to the original fluent; this makes sure that every time the original fluent is modified, also the new fluent is modified with the inverse value, so the new fluent created always represents the opposite of the original fluent and it is used instead of the Not(original_fluent) in the conditions.

Here we have an example on how to get from the factory and use a compiler to remove negative conditions:

[7]:
# Get the compiler from the factory
with Compiler(
        problem_kind = dcr_kind,
        compilation_kind = CompilationKind.NEGATIVE_CONDITIONS_REMOVING
    ) as negative_conditions_remover:
    # After we have the compiler, we get the compilation result
    ncr_result = negative_conditions_remover.compile(
        dcr_problem,
        CompilationKind.NEGATIVE_CONDITIONS_REMOVING
    )
    ncr_problem = ncr_result.problem
    ncr_kind = ncr_problem.kind

    # Check the result of the compilation
    assert original_problem_kind.has_negative_conditions()
    assert qr_kind.has_negative_conditions()
    assert cer_kind.has_negative_conditions()
    assert dcr_kind.has_negative_conditions()
    assert not ncr_kind.has_negative_conditions()

6.8. Solving the obtained problem with pyperplan

After all the compilers have been used in a pipeline, we can solve the problem with pyperplan.

6.8.1. Considerations on the plan obtained

As we can see in the simulation, the plan obtained makes no sense for the original plan; but we can focus on the length of the plan. We see it has 6 action instances, intuitively, 3 steps repeated twice: - switch one heater on - get a job done - switch the heater off

[8]:
# Get the planner from the factory
with OneshotPlanner(name = "pyperplan") as planner:
    assert planner.supports(ncr_kind)       # Make sure the planner supports the compiled problem
    ncr_plan = planner.solve(ncr_problem).plan  # Solve the problem and get the plan for the compiled problem
    print(ncr_plan)
    assert len(ncr_plan.actions) == 6
  *** Credits ***
  * In operation mode `OneshotPlanner` at line 2 of `/tmp/ipykernel_155649/1542274702.py`, you are using the following planning engine:
  * Engine name: pyperplan
  * Developers:  Albert-Ludwigs-Universität Freiburg (Yusra Alkhazraji, Matthias Frorath, Markus Grützner, Malte Helmert, Thomas Liebetraut, Robert Mattmüller, Manuela Ortlieb, Jendrik Seipp, Tobias Springenberg, Philip Stahl, Jan Wülfing)
  * Description: Pyperplan is a lightweight STRIPS planner written in Python.

SequentialPlan:
    switch_heater_on_0(heater_1)
    perform_job_0(clean)
    switch_heater_off_0(heater_1)
    switch_heater_on_0(heater_2)
    perform_job_0(work)
    switch_heater_off_0(heater_2)

6.9. How to get a plan valid for the original problem

All the compilers we used provide the capabilities of rewriting an action instance of the compiled problem into an action instance of the input problem.

So, since we used a pipeline of 4 compilers, we have to rewrite back the plan 4 times.

To rewrite back a plan from the compiled problem to the input problem (respectively, compiled_plan and input_plan), we use 2 main features offered by the unified_planning_framework: - The CompilationResult.map_back_action_instance, a field of type: Callable[[ActionInstance], ActionInstance]. This function takes an ActionInstance of the compiled problem and returns the equivalent ActionInstance of the input problem. - The Plan.replace_action_instances method, which takes exactly 1 argument of type Callable[[ActionInstance], ActionInstance], and creates a new plan where every action instance of the original plan is replaced with the result given by the function given as parameter.

Using those 2 features allows us to easily get the equivalent plan for the input problem, and by following the compilers pipeline backwards we can get the plan for the original problem.

[9]:
from unified_planning.engines import ValidationResultStatus
# The ValidationResultStatus class is defined in the unified_planning/engines/results.py file

# Create the equivalent plan for the dcr_problem (the one created by the disjunctive conditions remover)
dcr_plan = ncr_plan.replace_action_instances(ncr_result.map_back_action_instance)

# Check to see if the plan is actually valid for the problem
print(dcr_kind)
with PlanValidator(problem_kind = dcr_kind) as validator:
    assert validator.validate(dcr_problem, dcr_plan).status == ValidationResultStatus.VALID
PROBLEM_CLASS: ['ACTION_BASED']
CONDITIONS_KIND: ['NEGATIVE_CONDITIONS']
TYPING: ['FLAT_TYPING', 'HIERARCHICAL_TYPING']

6.10. Final result

Now repeat the process for all the compilers we used.

As we wanted to achieve, with the use of the compilers we managed to solve a problem with pyperplan, when pyperplan was not able to solve said problem.

[10]:
# Get the plan for the cer_problem
cer_plan = dcr_plan.replace_action_instances(dcr_result.map_back_action_instance)

# Get the plan for the qr_problem
qr_plan = cer_plan.replace_action_instances(cer_result.map_back_action_instance)

# Get the plan for the original problem
original_plan = qr_plan.replace_action_instances(qr_result.map_back_action_instance)

# Check to see if the obtained plan is actually valid for the original problem
with PlanValidator(problem_kind = original_problem_kind) as validator:
    assert validator.validate(original_problem, original_plan).status == ValidationResultStatus.VALID

print(original_plan)
SequentialPlan:
    switch_heater_on(heater_1)
    perform_job(clean)
    switch_heater_off(heater_1)
    switch_heater_on(heater_2)
    perform_job(work)
    switch_heater_off(heater_2)
[ ]:
from unified_planning.plot import plot_sequential_plan

Ignore the code below, it’s used to make this notebook also runnable in the Countinuous Intergation.

[ ]:
# Redefine the plot package methods imported above to print the plot to a temp file
# if the exception "could not locate runnable browser" is raised. This usually happens
# in the Continuous Integration.

from inspect import getmembers, isfunction
from unified_planning import plot
from functools import partial
import os, uuid, tempfile as tf

# Define the function that will be executed instead
def _function(original_function, *args, **kwargs):
    try:
        original_function(*args, **kwargs)
    except Exception as e:
        if "could not locate runnable browser" in str(e):
            original_function(*args, **kwargs,
                filename=f"{os.path.join(tf.gettempdir(), str(uuid.uuid1()))}.png"
            )
        else:
            raise e

# Iterate over all the functions of the plot package
for function_name, function in getmembers(plot, isfunction):
    # Override the original function with the new one
    globals()[function_name] = partial(_function, function)
[ ]:
plot_sequential_plan(original_plan, figsize=(8, 20), node_size=21000, font_size=10, top_bottom=True)