{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "-O38cYGqUwCJ" }, "source": [ "# Compilers Example\n", "\n", "In this example we will create a unified_planning problem and then show how to use a compiler on it to obtain a new equivalent problem; then, we will get a plan for the compiled problem and translate it into the equivalent plan for the original problem.\n", "\n", "[![Open In GitHub](https://img.shields.io/badge/see-Github-579aca?logo=github)](https:///github.com/aiplan4eu/unified-planning/blob/master/docs/notebooks/05-compilers.ipynb)\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aiplan4eu/unified-planning/blob/master/docs/notebooks/05-compilers.ipynb)\n", "\n", "## Setup\n", "\n", "We start by installing the library with PIP" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "id": "BoqALxJWdfl8", "tags": [ "remove_from_CI" ] }, "outputs": [], "source": [ "!apt install graphviz graphviz-dev\n", "%pip install unified-planning[tamer,plot]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define the UP Problem\n", "\n", "For this example we will create a problem with the following features:\n", "- default temperature is cold\n", "- 2 jobs that can be done only if it is warm\n", "- 2 heaters with some quirks: \n", " 1) If both heaters are switched on at the same time, it will cause an electrical failure. \n", " 2) Once an heater is switched on, the heat it provides can be used only for one job, after that the heater will not provide heat anymore. \n", " 3) Every heater can be switched on only once.\n", "\n", "In the end we want every job done, no heaters switched on and no electrical failure." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from unified_planning.shortcuts import *\n", "\n", "# Define User Types\n", "Heater = UserType(\"Heater\")\n", "Job = UserType(\"Job\")\n", "Clean = UserType(\"Clean\", Job)\n", "Work = UserType(\"Work\", Job)\n", "\n", "# Define fluents\n", "is_cold = Fluent(\"is_cold\", BoolType()) # BoolType is the default, so it can be avoided\n", "is_warm = Fluent(\"is_warm\")\n", "electrical_failure = Fluent(\"electrical_failure\")\n", "job_done = Fluent(\"job_done\", BoolType(), job = Job)\n", "is_on = Fluent(\"is_on\", BoolType(), heater = Heater)\n", "used_heater = Fluent(\"used_heater\", BoolType(), heater = Heater)\n", "\n", "# Define actions\n", "switch_heater_on = InstantaneousAction(\"switch_heater_on\", heater = Heater)\n", "heater = switch_heater_on.parameter(\"heater\")\n", "switch_heater_on.add_precondition(Not(used_heater(heater))) # The heater must not have been already used \n", "switch_heater_on.add_precondition(Not(is_on(heater))) # The heater must not be already on\n", "switch_heater_on.add_effect(is_warm, True) # The temperature becomes warm\n", "switch_heater_on.add_effect(is_on(heater), True) # The heater switches on\n", "# Define a Variable of type \"Heater\", used for the existential condition\n", "h_var = Variable(\"h_var\", Heater)\n", "# If exists an heater that is already on, we have an electrical failure\n", "switch_heater_on.add_effect(electrical_failure, True, Exists(is_on(h_var), h_var)) \n", "\n", "switch_heater_off = InstantaneousAction(\"switch_heater_off\", heater = Heater)\n", "heater = switch_heater_off.parameter(\"heater\")\n", "switch_heater_off.add_precondition(is_on(heater)) # The heater must be on\n", "switch_heater_off.add_effect(is_warm, False) # It is not warm anymore\n", "switch_heater_off.add_effect(is_cold, True) # It becomes cold\n", "switch_heater_off.add_effect(is_on(heater), False) # The heater turns off\n", "switch_heater_off.add_effect(used_heater(heater), True) # The heater becomes used\n", "\n", "perform_job = InstantaneousAction(\"perform_job\", job = Job)\n", "job = perform_job.parameter(\"job\")\n", "perform_job.add_precondition(is_warm) # Must be warm to do the job\n", "perform_job.add_effect(is_warm, False) # It is not warm anymore\n", "perform_job.add_effect(is_cold, True) # It becomes cold again\n", "perform_job.add_effect(job_done(job), True) # The job is done\n", "\n", "# define objects\n", "heater_1 = Object(\"heater_1\", Heater)\n", "heater_2 = Object(\"heater_2\", Heater)\n", "\n", "clean = Object(\"clean\", Clean)\n", "work = Object(\"work\", Work)\n", "\n", "# define the problem\n", "original_problem = Problem(\"heaters_and_jobs\")\n", "# add the fluents to the problem\n", "original_problem.add_fluent(is_cold, default_initial_value = True)\n", "original_problem.add_fluent(is_warm, default_initial_value = False)\n", "original_problem.add_fluent(electrical_failure, default_initial_value = False)\n", "original_problem.add_fluent(job_done, default_initial_value = False)\n", "original_problem.add_fluent(is_on, default_initial_value = False)\n", "original_problem.add_fluent(used_heater, default_initial_value = False)\n", "# add objects and actions\n", "original_problem.add_objects([heater_1, heater_2, clean, work])\n", "original_problem.add_actions([switch_heater_on, switch_heater_off, perform_job])\n", "\n", "# define the problem goals\n", "original_problem.add_goal(Not(electrical_failure)) # No electrical failure\n", "j_var = Variable(\"j_var\", Job) \n", "original_problem.add_goal(Forall(job_done(j_var), j_var)) # All jobs are done\n", "original_problem.add_goal(Forall(Not(is_on(h_var)), h_var)) # All heaters are switched off\n", "\n", "original_problem_kind = original_problem.kind" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Testing Compilers\n", "\n", "To show the usage and the capabilities of the compilers, we will take the problem we just defined and pyperplan, a planner that does not support some features of the original problem. \n", "\n", "With the use of compilers, pyperplan will be able to solve the equivalent compiled problem and then we will rewrite the plan back to be a plan of the original problem.\n", "\n", "### Get pyperplan solver\n", "\n", "We will now get the pyperplan engine and show that the original problem is not supported." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[96m\u001b[1mNOTE: To disable printing of planning engine credits, add this line to your code: `up.shortcuts.get_environment().credits_stream = None`\n", "\u001b[0m\u001b[96m *** Credits ***\n", "\u001b[0m\u001b[96m * In operation mode `OneshotPlanner` at line 1 of `/tmp/ipykernel_155649/2508516188.py`, \u001b[0m\u001b[96myou are using the following planning engine:\n", "\u001b[0m\u001b[96m * Engine name: pyperplan\n", " * Developers: Albert-Ludwigs-Universität Freiburg (Yusra Alkhazraji, Matthias Frorath, Markus Grützner, Malte Helmert, Thomas Liebetraut, Robert Mattmüller, Manuela Ortlieb, Jendrik Seipp, Tobias Springenberg, Philip Stahl, Jan Wülfing)\n", "\u001b[0m\u001b[96m * Description: \u001b[0m\u001b[96mPyperplan is a lightweight STRIPS planner written in Python.\u001b[0m\u001b[96m\n", "\u001b[0m\u001b[96m\n", "\u001b[0m" ] } ], "source": [ "with OneshotPlanner(name = \"pyperplan\") as planner:\n", " assert not planner.supports(original_problem_kind)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## First compilation kind: Quantifiers Removing\n", "\n", "The compilation kind `QUANTIFIERS_REMOVING` takes a problem that might have a quantifier operand, thus an `Exists` or a `Forall`, and returns an equivalent problem that does not contain any `Exists` or `Forall`. \n", "\n", "The quantifiers in the problem can be found in:\n", "- an action's condition (or precondition)\n", "- the condition of an action's conditional effect\n", "- the condition of a timed conditional effect\n", "- a goal or a timed goal \n", "- the assignment value of an effect\n", "\n", "### Example\n", "\n", "In the `unified_planning` we have a compiler that implements this specific compilation kind: the `unified_planning.engines.compilers.QuantifiersRemover`.\n", "It works by taking every expression in the problem and grounding the quantifiers (`Exists` and `Forall`) with the equivalent formula in the problem. \n", "\n", "For example, in our problem, the goal `Forall(job_done(j_var), j_var)` is equivalent to `job_done(clean) and job_done(work)`, because there are only 2 possible values that the `j_var` variable can have.\n", "It is interesting to note how the `j_var` variable, of type `Job`, is instantiated to 2 objects of type `Clean` and `Work`, due to the typing inheritance.\n", "\n", "Here we have an example on how to get from the factory and use a compiler to remove quantifiers:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from unified_planning.engines import CompilationKind\n", "# The CompilationKind class is defined in the unified_planning/engines/mixins/compiler.py file\n", "\n", "# To get the Compiler from the factory we can use the Compiler operation mode.\n", "# It takes a problem_kind and a compilation_kind, and returns a compiler with the capabilities we need\n", "with Compiler(\n", " problem_kind = original_problem_kind, \n", " compilation_kind = CompilationKind.QUANTIFIERS_REMOVING\n", " ) as quantifiers_remover:\n", " # After we have the compiler, we get the compilation result\n", " qr_result = quantifiers_remover.compile(\n", " original_problem, \n", " CompilationKind.QUANTIFIERS_REMOVING\n", " )\n", " qr_problem = qr_result.problem\n", " qr_kind = qr_problem.kind\n", " \n", " # Check the result of the compilation\n", " assert original_problem_kind.has_existential_conditions() and original_problem_kind.has_universal_conditions()\n", " assert not qr_kind.has_existential_conditions() and not qr_kind.has_universal_conditions()\n", " " ] }, { "cell_type": "markdown", "metadata": { "id": "s_lwgr5pVEsC" }, "source": [ "## Conditional Effects Removing\n", "\n", "The conditional effects removing compilation kind takes a problem with conditional effects and returns an equivalent problem without any conditional effect.\n", "\n", "The conditional effects can be found in:\n", "- action's effects\n", "- timed effects\n", "\n", "### Example\n", "\n", "In the `unified_planning` we have a compiler that implements this specific compilation kind: the `unified_planning.engines.compilers.ConditionalEffectsRemover`.\n", "\n", "It works by taking every action with at least one conditional effect and creating `2^N` actions, where `N` is the number of conditional effects in the action.\n", "For each conditional effect, we have 2 variants:\n", "- one where the condition of the conditional effect becomes an action precondition and the effect of the conditional effects becomes a normal effect of the action\n", "- one where the inverse of the condition of the conditional effect becomes an action precondition.\n", "\n", "All the possible combinations of those 2 variants of conditional effects are added, and then impossible or empty actions are removed.\n", "\n", "Here we have an example on how to get from the factory and use a compiler to remove conditional effects:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "id": "01nDJbkoVZU1" }, "outputs": [], "source": [ "# Get the compiler from the factory\n", "with Compiler(\n", " problem_kind = qr_kind, \n", " compilation_kind = CompilationKind.CONDITIONAL_EFFECTS_REMOVING\n", " ) as conditional_effects_remover:\n", " # After we have the compiler, we get the compilation result\n", " cer_result = conditional_effects_remover.compile(\n", " qr_problem, \n", " CompilationKind.CONDITIONAL_EFFECTS_REMOVING\n", " )\n", " cer_problem = cer_result.problem\n", " cer_kind = cer_problem.kind\n", " \n", " # Check the result of the compilation\n", " assert original_problem_kind.has_conditional_effects()\n", " assert qr_kind.has_conditional_effects()\n", " assert not cer_kind.has_conditional_effects()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Disjunctive Conditions Removing\n", "\n", "The disjunctive conditions removing compilation kind takes a problem that might have a complex expressions as an action condition or in the goals and returns an equivalent problem where every condition becomes only a conjunction of terms (an `And` of terms). Where each term can be a fluent or the negation of a fluent and not a complex expression.\n", "\n", "The complex conditions can be found in:\n", "- action's conditions (or preconditions)\n", "- the condition of an action's conditional effect\n", "- the condition of a timed conditional effect\n", "- problem's goals\n", "- problem's timed goals\n", "\n", "### Example\n", "\n", "In the `unified_planning` we have a compiler that implements this specific compilation kind: the `unified_planning.engines.compilers.DisjunctiveConditionsRemover`.\n", "It modifies all the actions by making a unique `And` containing all the action's preconditions, computes the equivalent formula of the original `And` as a disjunction of conjunctions (an `Or` of `Ands`), and then creates an action for every element of the resulting `Or`. \n", "Each resulting action has the same effects of the original action and one element of the `Or` as a precondition.\n", "\n", "A similar pattern is used to remove disjunctions from the problem goals, but it involves adding a fresh fluent and, for every element of the resulting `Or`, a fresh action.\n", "\n", "Here we have an example on how to get from the factory and use a compiler to remove disjunctive conditions:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Get the compiler from the factory\n", "with Compiler(\n", " problem_kind = cer_kind,\n", " compilation_kind = CompilationKind.DISJUNCTIVE_CONDITIONS_REMOVING\n", " ) as disjunctive_conditions_remover:\n", " # After we have the compiler, we get the compilation result\n", " dcr_result = disjunctive_conditions_remover.compile(\n", " cer_problem, \n", " CompilationKind.DISJUNCTIVE_CONDITIONS_REMOVING\n", " )\n", " dcr_problem = dcr_result.problem\n", " dcr_kind = dcr_problem.kind\n", " \n", " # Check the result of the compilation\n", " assert qr_kind.has_disjunctive_conditions()\n", " assert cer_kind.has_disjunctive_conditions()\n", " assert not dcr_kind.has_disjunctive_conditions()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Negative Conditions Removing\n", "\n", "The negative conditions removing compilation kind takes a problem that has the `Not` operand in any problem condition and returns an equivalent problem where the `Not` doesn't appear in the problem's conditions.\n", "\n", "The `Not` operand must be removed from:\n", "- the action's conditions (or preconditions)\n", "- the condition of every action's conditional effects\n", "- the condition of every timed conditional effects\n", "- problem's goals\n", "- problem's timed goals\n", "\n", "### Example\n", "\n", "In the `unified_planning` we have a compiler that implements this specific compilation kind: the `unified_planning.engines.compilers.DisjunctiveConditionsRemover`.\n", "The compiling process followed in the UP implementation is the following:\n", "- For every fluent that appears negated in the conditions or in the goals, a new fluent that represents the same fluent negated is created. \n", "- Every time the original fluent appears negated in an expression, it is replaced with the new fluent.\n", "- Every time the original fluent appears in the effect of an action, an effect is added to that action. The added effect assigns to the new fluent the negated value assigned to the original fluent; this makes sure that every time the original fluent is modified, also the new fluent is modified with the inverse value, so the new fluent created always represents the opposite of the original fluent and it is used instead of the `Not(original_fluent)` in the conditions.\n", "\n", "Here we have an example on how to get from the factory and use a compiler to remove negative conditions:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# Get the compiler from the factory\n", "with Compiler(\n", " problem_kind = dcr_kind,\n", " compilation_kind = CompilationKind.NEGATIVE_CONDITIONS_REMOVING\n", " ) as negative_conditions_remover:\n", " # After we have the compiler, we get the compilation result\n", " ncr_result = negative_conditions_remover.compile(\n", " dcr_problem, \n", " CompilationKind.NEGATIVE_CONDITIONS_REMOVING\n", " )\n", " ncr_problem = ncr_result.problem\n", " ncr_kind = ncr_problem.kind\n", " \n", " # Check the result of the compilation\n", " assert original_problem_kind.has_negative_conditions()\n", " assert qr_kind.has_negative_conditions()\n", " assert cer_kind.has_negative_conditions()\n", " assert dcr_kind.has_negative_conditions()\n", " assert not ncr_kind.has_negative_conditions()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Solving the obtained problem with pyperplan\n", "\n", "After all the compilers have been used in a pipeline, we can solve the problem with pyperplan.\n", "\n", "### Considerations on the plan obtained\n", "\n", "As we can see in the simulation, the plan obtained makes no sense for the original plan; but we can focus on the length of the plan. We see it has 6 action instances, intuitively, 3 steps repeated twice: \n", "- switch one heater on\n", "- get a job done\n", "- switch the heater off" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[96m *** Credits ***\n", "\u001b[0m\u001b[96m * In operation mode `OneshotPlanner` at line 2 of `/tmp/ipykernel_155649/1542274702.py`, \u001b[0m\u001b[96myou are using the following planning engine:\n", "\u001b[0m\u001b[96m * Engine name: pyperplan\n", " * Developers: Albert-Ludwigs-Universität Freiburg (Yusra Alkhazraji, Matthias Frorath, Markus Grützner, Malte Helmert, Thomas Liebetraut, Robert Mattmüller, Manuela Ortlieb, Jendrik Seipp, Tobias Springenberg, Philip Stahl, Jan Wülfing)\n", "\u001b[0m\u001b[96m * Description: \u001b[0m\u001b[96mPyperplan is a lightweight STRIPS planner written in Python.\u001b[0m\u001b[96m\n", "\u001b[0m\u001b[96m\n", "\u001b[0mSequentialPlan:\n", " switch_heater_on_0(heater_1)\n", " perform_job_0(clean)\n", " switch_heater_off_0(heater_1)\n", " switch_heater_on_0(heater_2)\n", " perform_job_0(work)\n", " switch_heater_off_0(heater_2)\n" ] } ], "source": [ "# Get the planner from the factory\n", "with OneshotPlanner(name = \"pyperplan\") as planner:\n", " assert planner.supports(ncr_kind) # Make sure the planner supports the compiled problem\n", " ncr_plan = planner.solve(ncr_problem).plan # Solve the problem and get the plan for the compiled problem\n", " print(ncr_plan)\n", " assert len(ncr_plan.actions) == 6" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## How to get a plan valid for the original problem\n", "\n", "All the compilers we used provide the capabilities of rewriting an action instance of the compiled problem into an action instance of the input problem. \n", "\n", "So, since we used a pipeline of 4 compilers, we have to rewrite back the plan 4 times.\n", "\n", "To rewrite back a plan from the compiled problem to the input problem (respectively, compiled_plan and input_plan), we use 2 main features offered by the unified_planning_framework:\n", "- The `CompilationResult.map_back_action_instance`, a field of type: `Callable[[ActionInstance], ActionInstance]`. This function takes an `ActionInstance` of the compiled problem and returns the equivalent `ActionInstance` of the input problem.\n", "- The `Plan.replace_action_instances` method, which takes exactly 1 argument of type `Callable[[ActionInstance], ActionInstance]`, and creates a new plan where every action instance of the original plan is replaced with the result given by the function given as parameter.\n", "\n", "Using those 2 features allows us to easily get the equivalent plan for the input problem, and by following the compilers pipeline backwards we can get the plan for the original problem." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "PROBLEM_CLASS: ['ACTION_BASED']\n", "CONDITIONS_KIND: ['NEGATIVE_CONDITIONS']\n", "TYPING: ['FLAT_TYPING', 'HIERARCHICAL_TYPING']\n" ] } ], "source": [ "from unified_planning.engines import ValidationResultStatus\n", "# The ValidationResultStatus class is defined in the unified_planning/engines/results.py file\n", "\n", "# Create the equivalent plan for the dcr_problem (the one created by the disjunctive conditions remover)\n", "dcr_plan = ncr_plan.replace_action_instances(ncr_result.map_back_action_instance)\n", "\n", "# Check to see if the plan is actually valid for the problem\n", "print(dcr_kind)\n", "with PlanValidator(problem_kind = dcr_kind) as validator:\n", " assert validator.validate(dcr_problem, dcr_plan).status == ValidationResultStatus.VALID" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Final result\n", "\n", "Now repeat the process for all the compilers we used.\n", "\n", "As we wanted to achieve, with the use of the compilers we managed to solve a problem with pyperplan, when pyperplan was not able to solve said problem." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "SequentialPlan:\n", " switch_heater_on(heater_1)\n", " perform_job(clean)\n", " switch_heater_off(heater_1)\n", " switch_heater_on(heater_2)\n", " perform_job(work)\n", " switch_heater_off(heater_2)\n" ] } ], "source": [ "# Get the plan for the cer_problem\n", "cer_plan = dcr_plan.replace_action_instances(dcr_result.map_back_action_instance)\n", "\n", "# Get the plan for the qr_problem\n", "qr_plan = cer_plan.replace_action_instances(cer_result.map_back_action_instance)\n", "\n", "# Get the plan for the original problem\n", "original_plan = qr_plan.replace_action_instances(qr_result.map_back_action_instance)\n", "\n", "# Check to see if the obtained plan is actually valid for the original problem\n", "with PlanValidator(problem_kind = original_problem_kind) as validator:\n", " assert validator.validate(original_problem, original_plan).status == ValidationResultStatus.VALID\n", "\n", "print(original_plan)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "from unified_planning.plot import plot_sequential_plan" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ignore the code below, it's used to make this notebook also runnable in the Countinuous Intergation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Redefine the plot package methods imported above to print the plot to a temp file\n", "# if the exception \"could not locate runnable browser\" is raised. This usually happens\n", "# in the Continuous Integration.\n", "\n", "from inspect import getmembers, isfunction\n", "from unified_planning import plot\n", "from functools import partial\n", "import os, uuid, tempfile as tf\n", "\n", "# Define the function that will be executed instead\n", "def _function(original_function, *args, **kwargs):\n", " try:\n", " original_function(*args, **kwargs)\n", " except Exception as e:\n", " if \"could not locate runnable browser\" in str(e):\n", " original_function(*args, **kwargs,\n", " filename=f\"{os.path.join(tf.gettempdir(), str(uuid.uuid1()))}.png\"\n", " )\n", " else:\n", " raise e\n", "\n", "# Iterate over all the functions of the plot package\n", "for function_name, function in getmembers(plot, isfunction):\n", " # Override the original function with the new one\n", " globals()[function_name] = partial(_function, function)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot_sequential_plan(original_plan, figsize=(8, 20), node_size=21000, font_size=10, top_bottom=True)" ] } ], "metadata": { "celltoolbar": "Tags", "colab": { "collapsed_sections": [], "name": "Compilers", "provenance": [] }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" }, "vscode": { "interpreter": { "hash": "fcfc934ecfdac8ddac62d6a80ba8d82faf47dc8d54fd6a313f0c016b85ebec0e" } } }, "nbformat": 4, "nbformat_minor": 1 }