{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "6nOTljC_mTMn" }, "source": [ "# SequentialSimulator\n", "\n", "This python notebook shows the usage of the SequentialSimulator.\n", "\n", "\n", "[![Open In GitHub](https://img.shields.io/badge/see-Github-579aca?logo=github)](https:///github.com/aiplan4eu/unified-planning/blob/master/docs/notebooks/08-sequential-simulator.ipynb)\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aiplan4eu/unified-planning/blob/master/docs/notebooks/08-sequential-simulator.ipynb)\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "id": "t8dCcpf7mivV" }, "source": [ "## Setup" ] }, { "cell_type": "markdown", "metadata": { "id": "CwlvEzKrm1jT" }, "source": [ "First, we install unified_planning library and its dependencies from PyPi. Here, we use the `--pre` flag to use the latest development build." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "BoqALxJWdfl8", "scrolled": false, "tags": [ "remove_from_CI" ] }, "outputs": [], "source": [ "!apt install graphviz graphviz-dev\n", "%pip install unified-planning[plot]" ] }, { "cell_type": "markdown", "metadata": { "id": "iNHFHxQKnKIp" }, "source": [ "We are now ready to use the Unified-Planning library!" ] }, { "cell_type": "markdown", "metadata": { "id": "9dP5scv7nNJu" }, "source": [ "## Demo\n" ] }, { "cell_type": "markdown", "metadata": { "id": "DXhHD7EAmxVx" }, "source": [ "### Basic imports\n", "The basic imports we need for this demo are abstracted in the `shortcuts` package." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "id": "fiRk2F1llJO-" }, "outputs": [], "source": [ "from unified_planning.shortcuts import *" ] }, { "cell_type": "markdown", "metadata": { "id": "fXA1NuXgkntV" }, "source": [ "### Problem definition" ] }, { "cell_type": "markdown", "metadata": { "id": "JXDIDNwCm13b" }, "source": [ "We start the problem modeling defining the `UserType` and the `Fluent`." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "id": "nHXzkD2LmWkB" }, "outputs": [], "source": [ "Location = UserType('Location')\n", "\n", "at = Fluent('at', Location)\n", "distance = Fluent('distance', IntType(), l1=Location, l2=Location)\n", "battery_charge = Fluent('battery_charge', IntType(0, 100))" ] }, { "cell_type": "markdown", "metadata": { "id": "OVUn5VPTmXKY" }, "source": [ "We define an action `move` that decreases the `battery_charge` by the `distance` of the move.\n", "\n", "Note that the battery can never go under 0, so every action that would reduce the battery to a negative number is not applicable." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "id": "cwXh99K2laqv" }, "outputs": [], "source": [ "move = InstantaneousAction('move', l_from=Location, l_to=Location)\n", "l_from = move.parameter('l_from')\n", "l_to = move.parameter('l_to')\n", "move.add_precondition(Equals(at, l_from))\n", "move.add_effect(at, l_to)\n", "move.add_decrease_effect(battery_charge, distance(l_from, l_to))" ] }, { "cell_type": "markdown", "metadata": { "id": "C92YH1nrmRb7" }, "source": [ "Finally, we define the `Object` instances and, after creating the `Problem`, we set the initial values and the goal.\n", "\n", "Set the default of the `distance` to `101`, so only the distances that we define explicitly create valid connections between locations. (Otherwise, the Typing bound on the `battery_charge` would be violated.)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "id": "MczotRommR_k" }, "outputs": [], "source": [ "l1 = Object('l1', Location)\n", "l2 = Object('l2', Location)\n", "l3 = Object('l3', Location)\n", "l4 = Object('l4', Location)\n", "l5 = Object('l5', Location)\n", "locations = [l5, l4, l3, l2, l1]\n", "\n", "problem = Problem('moving_robot')\n", "problem.add_fluent(at)\n", "problem.add_fluent(battery_charge)\n", "problem.add_fluent(distance, default_initial_value = 101)\n", "problem.add_action(move)\n", "problem.add_objects(locations)\n", "\n", "problem.set_initial_value(at, l1)\n", "problem.set_initial_value(battery_charge, 100)\n", "\n", "problem.set_initial_value(distance(l1, l2), 20)\n", "problem.set_initial_value(distance(l2, l3), 30)\n", "problem.set_initial_value(distance(l3, l4), 20)\n", "problem.set_initial_value(distance(l4, l5), 30)\n", "\n", "problem.set_initial_value(distance(l1, l3), 60)\n", "\n", "problem.add_goal(Equals(at, l5))\n", "battery_exp = FluentExp(battery_charge)" ] }, { "cell_type": "markdown", "metadata": { "id": "pXvh83ljlabv" }, "source": [ "### Simulating the problem" ] }, { "cell_type": "markdown", "metadata": { "id": "McV4znupqpkw" }, "source": [ "Get the `simulator` with the `SequentialSimulator` Operation Mode and start simulating.\n", "\n", "Since we have to reach `l5`, we iterate over the locations and see which locations we can reach from `l1` using the `simulator.is_applicable` method. " ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "id": "bXHWJh2vl5RJ", "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "From l1 we can reach: l3\n", "From l1 we can reach: l2\n" ] } ], "source": [ "with SequentialSimulator(problem=problem) as simulator:\n", " initial_state = simulator.get_initial_state()\n", " for travel_location in locations:\n", " if simulator.is_applicable(initial_state, move, (l1, travel_location)):\n", " print(f\"From l1 we can reach: {travel_location}\") " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we see, from `l1` we can reach `l2` and `l3`; since `l3` seems closer to `l5`, we decide to go there!" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "From l3 we can reach: l4\n", "Problem! Battery too low: 20\n" ] } ], "source": [ " state_at_l3 = simulator.apply(initial_state, move, (l1, l3))\n", " for travel_location in locations:\n", " if simulator.is_applicable(state_at_l3, move, (l3, travel_location)):\n", " print(f\"From l3 we can reach: {travel_location}\") \n", " state_at_l4 = simulator.apply(state_at_l3, move, (l3, l4))\n", " if simulator.is_applicable(state_at_l4, move, (l4, l5)):\n", " print('Done!')\n", " else:\n", " print(f'Problem! Battery too low: {state_at_l4.get_value(battery_exp)}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We found a problem in our simulation. From the state in which we reached `l3`, the remaining battery is not enough to reach `l5`.\n", "\n", "The only decision we made to go to reach `l3` directly from `l1` seemed wrong, so let's try to reach `l3` passing from `l2`, and see it that saves battery. " ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Reaching l3 passing through l2 saves battery!\n" ] } ], "source": [ " state_at_l2 = simulator.apply(initial_state, move, (l1, l2))\n", " new_state_at_l3 = simulator.apply(state_at_l2, move, (l2, l3))\n", " new_state_better = (new_state_at_l3.get_value(battery_exp) > state_at_l3.get_value(battery_exp)).simplify()\n", " if new_state_better.bool_constant_value():\n", " print(\"Reaching l3 passing through l2 saves battery!\")\n", " else:\n", " print(\"Can't save battery reaching l3, the problem has no solution!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we saw, this saves battery, so let's try reaching `l5` with this new battery value." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Done!\n" ] } ], "source": [ " state_at_l3 = new_state_at_l3\n", " state_at_l4 = simulator.apply(state_at_l3, move, (l3, l4))\n", " if simulator.is_applicable(state_at_l4, move, (l4, l5)):\n", " print('Done!')\n", " else:\n", " print(f'Problem! Battery too low: {state_at_l4.get_value(battery_exp)}')\n", " state_at_l5 = simulator.apply(state_at_l4, move, (l4, l5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simulating a SequentialPlan and inspect the State\n", "\n", "As we saw, a possible plan is to go trough the Locations in order, so let's create a `SequentialPlan` that does this.\n", "\n", "After creating the plan, we can simulate it and, for example, save the values of the battery during the whole plan in a list." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Initial battery value: 100\n", "Battery value after move(l1, l2): 80\n", "Battery value after move(l2, l3): 50\n", "Battery value after move(l3, l4): 30\n", "Battery value after move(l4, l5): 0\n" ] } ], "source": [ " from unified_planning.plans import SequentialPlan, ActionInstance\n", " \n", " plan = SequentialPlan([\n", " ActionInstance(move, (l1, l2)), \n", " ActionInstance(move, (l2, l3)),\n", " ActionInstance(move, (l3, l4)),\n", " ActionInstance(move, (l4, l5))\n", " ])\n", " print(f\"Initial battery value: {initial_state.get_value(battery_exp).constant_value()}\")\n", " current_state = initial_state\n", " # We also store the states to plot the metrics later\n", " states = [current_state]\n", " for action_instance in plan.actions:\n", " current_state = simulator.apply(current_state, action_instance)\n", " if current_state is None:\n", " print(f'Error in applying: {action_instance}')\n", " break\n", " states.append(current_state)\n", " current_battery_value = current_state.get_value(battery_exp).constant_value()\n", " # in current_battery_value we inspect the State\n", " print(f\"Battery value after {action_instance}: {current_battery_value}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After we stored the battery values during the plan, we can do anything with those; in this example we plot the battery values in a graph to see how it decreases at each step." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ " from unified_planning.plot import plot_sequential_plan" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ignore the code below, it's used to make this notebook also runnable in the Countinuous Intergation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ " # Redefine the plot package methods imported above to print the plot to a temp file\n", " # if the exception \"could not locate runnable browser\" is raised. This usually happens\n", " # in the Continuous Integration.\n", "\n", " from inspect import getmembers, isfunction\n", " from unified_planning import plot\n", " from functools import partial\n", " import os, uuid, tempfile as tf\n", "\n", " # Define the function that will be executed instead\n", " def _function(original_function, *args, **kwargs):\n", " try:\n", " original_function(*args, **kwargs)\n", " except Exception as e:\n", " if \"could not locate runnable browser\" in str(e):\n", " original_function(*args, **kwargs,\n", " filename=f\"{os.path.join(tf.gettempdir(), str(uuid.uuid1()))}.png\"\n", " )\n", " else:\n", " raise e\n", "\n", " # Iterate over all the functions of the plot package\n", " for function_name, function in getmembers(plot, isfunction):\n", " # Override the original function with the new one\n", " globals()[function_name] = partial(_function, function)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ " plot_sequential_plan(plan, problem, battery_exp, figsize=(9, 3))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Evaluating the Plan Quality Metrics\n", "\n", "The UP `engines` package offers a utility function that takes a `SequentialSimulator` and evaluates a `PlanQualityMetric`; this is done by using the `State` and the `actions` sequence." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Plan length: 1\n", "Maximized epression value: 80\n", "Plan length: 2\n", "Maximized epression value: 50\n", "Plan length: 3\n", "Maximized epression value: 30\n", "Plan length: 4\n", "Maximized epression value: 0\n" ] } ], "source": [ " from unified_planning.engines.sequential_simulator import evaluate_quality_metric, evaluate_quality_metric_in_initial_state\n", "\n", " plan_length = MinimizeSequentialPlanLength()\n", " maximize_battery = MaximizeExpressionOnFinalState(battery_exp)\n", "\n", " plan_length_value = evaluate_quality_metric_in_initial_state(simulator, plan_length)\n", " maximize_battery_value = evaluate_quality_metric_in_initial_state(simulator, maximize_battery)\n", " \n", " current_state = states[0]\n", " for next_state, action_instance in zip(states[1:], plan.actions):\n", " plan_length_value = evaluate_quality_metric(\n", " simulator, \n", " plan_length, \n", " plan_length_value,\n", " current_state,\n", " action_instance.action,\n", " action_instance.actual_parameters,\n", " next_state\n", " )\n", " maximize_battery_value = evaluate_quality_metric(\n", " simulator, \n", " maximize_battery, \n", " maximize_battery_value,\n", " current_state,\n", " action_instance.action,\n", " action_instance.actual_parameters,\n", " next_state\n", " )\n", " current_state = next_state\n", " \n", " # Do something with the metric values\n", " print(f'Plan length: {plan_length_value}\\nMaximized epression value: {maximize_battery_value}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Also the value of the metrics can be plotted" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ " try:\n", " plot_sequential_plan(plan, problem, metric_or_metrics=[plan_length, maximize_battery], figsize=(9, 3))\n", " except Exception as e:\n", " if \"could not locate runnable browser\" in str(e):\n", " plot_sequential_plan(plan, problem, battery_exp, figsize=(9, 3), filename=\"sequential_simulator_2.png\")\n", " else:\n", " raise e" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "Sequential Simulator", "provenance": [] }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.12" }, "vscode": { "interpreter": { "hash": "fcfc934ecfdac8ddac62d6a80ba8d82faf47dc8d54fd6a313f0c016b85ebec0e" } } }, "nbformat": 4, "nbformat_minor": 1 }