Planner Integration

In this simple demo we will create a new planning engine for the “Oneshot” operation mode, we will register the new planner in the UP library and we will test it on a simple problem.

Open In GitHub Open In Colab

NOTE: The unified planning library has auxiliary base classes for integrating stand-alone PDDL planners (not described in this notebook). They take care of writing the PDDL files, calling your planner and collecting the plan from disk on various operating systems. For oneshot planning, use engines.PDDLPlanner. For anytime planning there is engines.PDDLAnytimePlanner.

Setup

We start by installing the library with PIP

[ ]:
%pip install unified-planning

Define the planning engine

A planning engine is a class inheriting from the unified_planning.engines.Engine class. The class shall inheriting also from at least one of the mixins class in unified_planning.engines.mixins.

Custom options can be passed as keyword arguments in the constructor and each planning engine shall have a name specified by implementing the name() method.

In order for the UP library to filter applicable engines, the supports method must be implemented. The method shall return true if the engine is capable of dealing with the given ProblemKind, that is a class storing information on the problem class (much like the :requiremnets specification of PDDL).

In this example, we create a simple OneshotPlanner (see unified_planning.engines.factory for details and other operation modes). A OneshotPlanner is an instance of unified_planning.engines.Engine and unified_planning.engines.mixins.OneshotPlannerMixin that implements the solve() method returning a up.engines.results.PlanGenerationResult that contains plan in the form of an instance (or subclass) of unified_planning.plan.Plan if a plan could be found. In the code below, we implement a simple random walker that uses a PlanValidator (another operation mode) as a way to check if prefixes of plans are valid.

[2]:
import random
from typing import Callable, IO, Optional
import unified_planning as up
from unified_planning import engines

class MySolverImpl(up.engines.Engine,
                   up.engines.mixins.OneshotPlannerMixin):
    def __init__(self, **options):
        # Read known user-options and store them for using in the `solve` method
        up.engines.Engine.__init__(self)
        up.engines.mixins.OneshotPlannerMixin.__init__(self)
        self.max_tries = options.get('max_tries', None)
        self.restart_probability = options.get('restart_probability', 0.00001)

    @property
    def name(self) -> str:
        return "YOLOPlanner"

    @staticmethod
    def supported_kind():
        # For this demo we limit ourselves to numeric planning.
        # Other kinds of problems can be modeled in the UP library,
        # see unified_planning.model.problem_kind.
        supported_kind = up.model.ProblemKind()
        supported_kind.set_problem_class("ACTION_BASED")
        supported_kind.set_problem_type("GENERAL_NUMERIC_PLANNING")
        supported_kind.set_typing('FLAT_TYPING')
        supported_kind.set_typing('HIERARCHICAL_TYPING')
        supported_kind.set_numbers('CONTINUOUS_NUMBERS')
        supported_kind.set_numbers('DISCRETE_NUMBERS')
        supported_kind.set_fluents_type('NUMERIC_FLUENTS')
        supported_kind.set_numbers('BOUNDED_TYPES')
        supported_kind.set_fluents_type('OBJECT_FLUENTS')
        supported_kind.set_conditions_kind('NEGATIVE_CONDITIONS')
        supported_kind.set_conditions_kind('DISJUNCTIVE_CONDITIONS')
        supported_kind.set_conditions_kind('EQUALITIES')
        supported_kind.set_conditions_kind('EXISTENTIAL_CONDITIONS')
        supported_kind.set_conditions_kind('UNIVERSAL_CONDITIONS')
        supported_kind.set_effects_kind('CONDITIONAL_EFFECTS')
        supported_kind.set_effects_kind('INCREASE_EFFECTS')
        supported_kind.set_effects_kind('DECREASE_EFFECTS')
        supported_kind.set_effects_kind('FLUENTS_IN_NUMERIC_ASSIGNMENTS')

        return supported_kind

    @staticmethod
    def supports(problem_kind):
        return problem_kind <= MySolverImpl.supported_kind()

    def _solve(self, problem: 'up.model.Problem',
              callback: Optional[Callable[['up.engines.PlanGenerationResult'], None]] = None,
              timeout: Optional[float] = None,
              output_stream: Optional[IO[str]] = None) -> 'up.engines.PlanGenerationResult':
        env = problem.environment

        # First we ground the problem
        with env.factory.Compiler(problem_kind=problem.kind, compilation_kind=up.engines.CompilationKind.GROUNDING) as grounder:
            grounding_result = grounder.compile(problem, up.engines.CompilationKind.GROUNDING)
        grounded_problem = grounding_result.problem

        # We store the grounded actions in a list
        actions = list(grounded_problem.instantaneous_actions)

        # The candidate plan, initially empty
        plan = up.plans.SequentialPlan([])

        # Ask for an instance of a PlanValidator by name
        # (`sequential_plan_validator` is a python implementation of the
        # PlanValidator operation mode offered by the UP library)
        with env.factory.PlanValidator(name='sequential_plan_validator') as pv:
            counter = 0
            while True:
                # With a certain probability, restart from scratch to avoid dead-ends
                if random.random() < self.restart_probability:
                    plan = up.plans.SequentialPlan()
                else:
                    # Select a random action
                    a = random.choice(actions)
                    # Create the relative action instance
                    ai = up.plans.ActionInstance(a)
                    # Append the action to the plan
                    plan.actions.append(ai)

                    # Check plan validity
                    res = pv.validate(grounded_problem, plan)
                    if res:
                        # If the plan is valid, lift the action instances and
                        # return the resulting plan
                        resplan = plan.replace_action_instances(grounding_result.map_back_action_instance)
                        # Sanity check
                        assert pv.validate(problem, resplan)
                        status = up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING
                        return up.engines.PlanGenerationResult(status, resplan, self.name)
                    else:
                        # If the plan is invalid, check if the reason is action
                        # applicability (as opposed to goal satisfaction)
                        einfo = res.log_messages[0].message
                        if 'Goals' not in einfo:
                            # If the plan is not executable, remove the last action
                            plan.actions.pop()
                    # Limit the number of tries, according to the user specification
                    counter += 1
                    if self.max_tries is not None and counter >= self.max_tries:
                        status = up.engines.PlanGenerationResultStatus.TIMEOUT
                        return up.engines.PlanGenerationResult(status, None, self.name)

    def destroy(self):
        pass

Congratulations! You just created an integrated planning engine for the Oneshot operation mode! Of course, in a more realistic scenario, one would need to read the problem object and call an external engine or provide a more involved algorithm to provide the service, but this is already sufficient for testing out our simple engine.

Registering the engine

In order to use our YOLOPlanner, we need to register it among the set of planning engines available for the UP library as follows.

[3]:
env = up.environment.get_environment()
env.factory.add_engine('yoloplanner', __name__, 'MySolverImpl')

Essentially, we just need to give a custom name (in our case yoloplanner) a module name (in thic case, __name__ as we are in the same file as the Solver class) and finally the class name that we used to define our planning engine.

Done! We are now ready to test our planning engine!

Testing the engine

We start by defining a simple problem in the UP syntax (alternatively we can use a parser or any other way to create a problem obejct)

[4]:
from unified_planning.shortcuts import *

problem = Problem('robot')
Location = UserType('Location')
robot_at = problem.add_fluent('robot_at', BoolType(), loc=Location)
battery_charge = problem.add_fluent('battery_charge', RealType(0, 100))

move = InstantaneousAction('move', l_from=Location, l_to=Location)
l_from = move.l_from
l_to = move.l_to
move.add_precondition(battery_charge >= 10)
move.add_precondition(Not(Equals(l_from, l_to)))
move.add_precondition(robot_at(l_from))
move.add_effect(robot_at(l_from), False)
move.add_effect(robot_at(l_to), True)
move.add_effect(battery_charge, battery_charge - 10)
problem.add_action(move)

l1 = problem.add_object('l1', Location)
l2 = problem.add_object('l2', Location)
problem.set_initial_value(robot_at(l1), True)
problem.set_initial_value(robot_at(l2), False)
problem.set_initial_value(battery_charge, 100)
problem.add_goal(robot_at(l2))

Then, we invoke the operation mode as for any other planner in the UP library!

[5]:
with OneshotPlanner(name='yoloplanner', params = {'max_tries' : 5}) as p:
    result = p.solve(problem)
    if result.status == up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING:
        print(f'{p.name} found a valid plan!')
        print(result.plan)
    else:
        print('No plan found!')

YOLOPlanner found a valid plan!
SequentialPlan:
    move(l1, l2)