{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "-O38cYGqUwCJ" }, "source": [ "# Planner Integration\n", "\n", "In this simple demo we will create a new planning engine for the \"Oneshot\" operation mode, we will register the new planner in the UP library and we will test it on a simple problem.\n", "\n", "[![Open In GitHub](https://img.shields.io/badge/see-Github-579aca?logo=github)](https://github.com/aiplan4eu/unified-planning/blob/master/docs/notebooks/tutorial/planner-integration.ipynb)\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aiplan4eu/unified-planning/blob/master/docs/notebooks/tutorial/planner-integration.ipynb)\n", "\n", "**_NOTE:_** The unified planning library has auxiliary base classes for **integrating stand-alone PDDL planners** (not described in this notebook). They take care of writing the PDDL files, calling your planner and collecting the plan from disk on various operating systems.\n", "For oneshot planning, use [*engines.PDDLPlanner*](https://github.com/aiplan4eu/unified-planning/blob/master/unified_planning/engines/pddl_planner.py). For anytime planning there is [*engines.PDDLAnytimePlanner*](https://github.com/aiplan4eu/unified-planning/blob/master/unified_planning/engines/pddl_anytime_planner.py).\n", "\n", "## Setup\n", "We start by installing the library with PIP" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "BoqALxJWdfl8", "tags": [ "remove_from_CI" ] }, "outputs": [], "source": [ "%pip install unified-planning" ] }, { "cell_type": "markdown", "metadata": { "id": "s_lwgr5pVEsC" }, "source": [ "## Define the planning engine\n", "A planning engine is a class inheriting from the `unified_planning.engines.Engine` class. The class shall inheriting also from at least one of the mixins class in `unified_planning.engines.mixins`. \n", "\n", "Custom options can be passed as keyword arguments in the constructor and each planning engine shall have a name specified by implementing the `name()` method.\n", "\n", "In order for the UP library to filter applicable engines, the `supports` method must be implemented. The method shall return true if the engine is capable of dealing with the given `ProblemKind`, that is a class storing information on the problem class (much like the `:requiremnets` specification of PDDL).\n", "\n", "In this example, we create a simple `OneshotPlanner` (see `unified_planning.engines.factory` for details and other operation modes). A `OneshotPlanner` is an instance of `unified_planning.engines.Engine` and `unified_planning.engines.mixins.OneshotPlannerMixin` that implements the `solve()` method returning a `up.engines.results.PlanGenerationResult` that contains plan in the form of an instance (or subclass) of `unified_planning.plan.Plan` if a plan could be found. In the code below, we implement a simple random walker that uses a `PlanValidator` (another operation mode) as a way to check if prefixes of plans are valid.\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "id": "01nDJbkoVZU1" }, "outputs": [], "source": [ "import random\n", "from typing import Callable, IO, Optional\n", "import unified_planning as up\n", "from unified_planning import engines\n", "\n", "class MySolverImpl(up.engines.Engine,\n", " up.engines.mixins.OneshotPlannerMixin):\n", " def __init__(self, **options):\n", " # Read known user-options and store them for using in the `solve` method\n", " up.engines.Engine.__init__(self)\n", " up.engines.mixins.OneshotPlannerMixin.__init__(self)\n", " self.max_tries = options.get('max_tries', None)\n", " self.restart_probability = options.get('restart_probability', 0.00001)\n", "\n", " @property\n", " def name(self) -> str:\n", " return \"YOLOPlanner\"\n", "\n", " @staticmethod\n", " def supported_kind():\n", " # For this demo we limit ourselves to numeric planning.\n", " # Other kinds of problems can be modeled in the UP library,\n", " # see unified_planning.model.problem_kind.\n", " supported_kind = up.model.ProblemKind()\n", " supported_kind.set_problem_class(\"ACTION_BASED\")\n", " supported_kind.set_problem_type(\"GENERAL_NUMERIC_PLANNING\")\n", " supported_kind.set_typing('FLAT_TYPING')\n", " supported_kind.set_typing('HIERARCHICAL_TYPING')\n", " supported_kind.set_numbers('CONTINUOUS_NUMBERS')\n", " supported_kind.set_numbers('DISCRETE_NUMBERS')\n", " supported_kind.set_fluents_type('NUMERIC_FLUENTS')\n", " supported_kind.set_numbers('BOUNDED_TYPES')\n", " supported_kind.set_fluents_type('OBJECT_FLUENTS')\n", " supported_kind.set_conditions_kind('NEGATIVE_CONDITIONS')\n", " supported_kind.set_conditions_kind('DISJUNCTIVE_CONDITIONS')\n", " supported_kind.set_conditions_kind('EQUALITIES')\n", " supported_kind.set_conditions_kind('EXISTENTIAL_CONDITIONS')\n", " supported_kind.set_conditions_kind('UNIVERSAL_CONDITIONS')\n", " supported_kind.set_effects_kind('CONDITIONAL_EFFECTS')\n", " supported_kind.set_effects_kind('INCREASE_EFFECTS')\n", " supported_kind.set_effects_kind('DECREASE_EFFECTS')\n", " supported_kind.set_effects_kind('FLUENTS_IN_NUMERIC_ASSIGNMENTS')\n", "\n", " return supported_kind\n", "\n", " @staticmethod\n", " def supports(problem_kind):\n", " return problem_kind <= MySolverImpl.supported_kind()\n", "\n", " def _solve(self, problem: 'up.model.Problem',\n", " callback: Optional[Callable[['up.engines.PlanGenerationResult'], None]] = None,\n", " timeout: Optional[float] = None,\n", " output_stream: Optional[IO[str]] = None) -> 'up.engines.PlanGenerationResult':\n", " env = problem.environment\n", "\n", " # First we ground the problem\n", " with env.factory.Compiler(problem_kind=problem.kind, compilation_kind=up.engines.CompilationKind.GROUNDING) as grounder:\n", " grounding_result = grounder.compile(problem, up.engines.CompilationKind.GROUNDING)\n", " grounded_problem = grounding_result.problem\n", " \n", " # We store the grounded actions in a list\n", " actions = list(grounded_problem.instantaneous_actions)\n", " \n", " # The candidate plan, initially empty\n", " plan = up.plans.SequentialPlan([])\n", "\n", " # Ask for an instance of a PlanValidator by name\n", " # (`sequential_plan_validator` is a python implementation of the \n", " # PlanValidator operation mode offered by the UP library)\n", " with env.factory.PlanValidator(name='sequential_plan_validator') as pv:\n", " counter = 0\n", " while True:\n", " # With a certain probability, restart from scratch to avoid dead-ends\n", " if random.random() < self.restart_probability:\n", " plan = up.plans.SequentialPlan()\n", " else:\n", " # Select a random action\n", " a = random.choice(actions)\n", " # Create the relative action instance\n", " ai = up.plans.ActionInstance(a)\n", " # Append the action to the plan\n", " plan.actions.append(ai)\n", "\n", " # Check plan validity\n", " res = pv.validate(grounded_problem, plan)\n", " if res:\n", " # If the plan is valid, lift the action instances and\n", " # return the resulting plan\n", " resplan = plan.replace_action_instances(grounding_result.map_back_action_instance)\n", " # Sanity check\n", " assert pv.validate(problem, resplan)\n", " status = up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING\n", " return up.engines.PlanGenerationResult(status, resplan, self.name)\n", " else:\n", " # If the plan is invalid, check if the reason is action\n", " # applicability (as opposed to goal satisfaction)\n", " einfo = res.log_messages[0].message\n", " if 'Goals' not in einfo:\n", " # If the plan is not executable, remove the last action\n", " plan.actions.pop()\n", " # Limit the number of tries, according to the user specification\n", " counter += 1\n", " if self.max_tries is not None and counter >= self.max_tries:\n", " status = up.engines.PlanGenerationResultStatus.TIMEOUT\n", " return up.engines.PlanGenerationResult(status, None, self.name)\n", "\n", " def destroy(self):\n", " pass" ] }, { "cell_type": "markdown", "metadata": { "id": "-fFY3Q3mZo4P" }, "source": [ "Congratulations! You just created an integrated planning engine for the `Oneshot` operation mode! Of course, in a more realistic scenario, one would need to read the `problem` object and call an external engine or provide a more involved algorithm to provide the service, but this is already sufficient for testing out our simple engine." ] }, { "cell_type": "markdown", "metadata": { "id": "1xp1How6aClP" }, "source": [ "## Registering the engine\n", "\n", "In order to use our `YOLOPlanner`, we need to register it among the set of planning engines available for the UP library as follows." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "id": "nY1d3eK7amBP" }, "outputs": [], "source": [ "env = up.environment.get_environment()\n", "env.factory.add_engine('yoloplanner', __name__, 'MySolverImpl')" ] }, { "cell_type": "markdown", "metadata": { "id": "kGhQuSjTanEM" }, "source": [ "Essentially, we just need to give a custom name (in our case `yoloplanner`) a module name (in thic case, `__name__` as we are in the same file as the Solver class) and finally the class name that we used to define our planning engine.\n", "\n", "Done! We are now ready to test our planning engine!" ] }, { "cell_type": "markdown", "metadata": { "id": "f5IWceEGa_YA" }, "source": [ "## Testing the engine\n", "\n", "We start by defining a simple problem in the UP syntax (alternatively we can use a parser or any other way to create a problem obejct)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from unified_planning.shortcuts import * \n", "\n", "problem = Problem('robot')\n", "Location = UserType('Location')\n", "robot_at = problem.add_fluent('robot_at', BoolType(), loc=Location)\n", "battery_charge = problem.add_fluent('battery_charge', RealType(0, 100))\n", "\n", "move = InstantaneousAction('move', l_from=Location, l_to=Location)\n", "l_from = move.l_from\n", "l_to = move.l_to\n", "move.add_precondition(battery_charge >= 10)\n", "move.add_precondition(Not(Equals(l_from, l_to)))\n", "move.add_precondition(robot_at(l_from))\n", "move.add_effect(robot_at(l_from), False)\n", "move.add_effect(robot_at(l_to), True)\n", "move.add_effect(battery_charge, battery_charge - 10)\n", "problem.add_action(move)\n", "\n", "l1 = problem.add_object('l1', Location)\n", "l2 = problem.add_object('l2', Location)\n", "problem.set_initial_value(robot_at(l1), True)\n", "problem.set_initial_value(robot_at(l2), False)\n", "problem.set_initial_value(battery_charge, 100)\n", "problem.add_goal(robot_at(l2))" ] }, { "cell_type": "markdown", "metadata": { "id": "p2L0ogodbLZw" }, "source": [ "Then, we invoke the operation mode as for any other planner in the UP library!" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "YOLOPlanner found a valid plan!\n", "SequentialPlan:\n", " move(l1, l2)\n" ] } ], "source": [ "with OneshotPlanner(name='yoloplanner', params = {'max_tries' : 5}) as p:\n", " result = p.solve(problem)\n", " if result.status == up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING:\n", " print(f'{p.name} found a valid plan!')\n", " print(result.plan)\n", " else:\n", " print('No plan found!')\n" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "Planner Integration", "provenance": [] }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" }, "vscode": { "interpreter": { "hash": "fcfc934ecfdac8ddac62d6a80ba8d82faf47dc8d54fd6a313f0c016b85ebec0e" } } }, "nbformat": 4, "nbformat_minor": 1 }