12. Planner Integration

In this simple demo we will create a new planning engine for the “Oneshot” operation mode, we will register the new planner in the UP library and we will test it on a simple problem.

Open In GitHub Open In Colab

12.1. Setup the UP library

We start by installing the library with PIP

[2]:
!pip install --pre unified-planning

12.2. Define the planning engine

A planning engine is a class inheriting from the unified_planning.engines.Engine class. The class shall inheriting also from at least one of the mixins class in unified_planning.engines.mixins.

Custom options can be passed as keyword arguments in the constructor and each planning engine shall have a name specified by implementing the name() method.

In order for the UP library to filter applicable engines, the supports method must be implemented. The method shall return true if the engine is capable of dealing with the given ProblemKind, that is a class storing information on the problem class (much like the :requiremnets specification of PDDL).

In this example, we create a simple OneshotPlanner (see unified_planning.engines.factory for details and other operation modes). A OneshotPlanner is an instance of unified_planning.engines.Engine and unified_planning.engines.mixins.OneshotPlannerMixin that implements the solve() method returning a up.engines.results.PlanGenerationResult that contains plan in the form of an instance (or subclass) of unified_planning.plan.Plan if a plan could be found. In the code below, we implement a simple random walker that uses a PlanValidator (another operation mode) as a way to check if prefixes of plans are valid.

[4]:
import random
from typing import Optional, Callable, IO
from unified_planning.engines.results import PlanGenerationResultStatus
import unified_planning as up
import unified_planning.engines as engines
from unified_planning.model import ProblemKind

class MySolverImpl(engines.Engine,
                   engines.mixins.OneshotPlannerMixin):
    def __init__(self, **options):
        # Read known user-options and store them for using in the `solve` method
        engines.Engine.__init__(self)
        engines.mixins.OneshotPlannerMixin.__init__(self)
        self.max_tries = options.get('max_tries', None)
        self.restart_probability = options.get('restart_probability', 0.00001)

    @property
    def name(self) -> str:
        return "YOLOPlanner"

    @staticmethod
    def supported_kind():
        # For this demo we limit ourselves to numeric planning.
        # Other kinds of problems can be modeled in the UP library,
        # see unified_planning.model.problem_kind.
        supported_kind = ProblemKind()
        supported_kind.set_typing('FLAT_TYPING')
        supported_kind.set_typing('HIERARCHICAL_TYPING')
        supported_kind.set_numbers('CONTINUOUS_NUMBERS')
        supported_kind.set_numbers('DISCRETE_NUMBERS')
        supported_kind.set_fluents_type('NUMERIC_FLUENTS')
        supported_kind.set_fluents_type('OBJECT_FLUENTS')
        supported_kind.set_conditions_kind('NEGATIVE_CONDITIONS')
        supported_kind.set_conditions_kind('DISJUNCTIVE_CONDITIONS')
        supported_kind.set_conditions_kind('EQUALITIES')
        supported_kind.set_conditions_kind('EXISTENTIAL_CONDITIONS')
        supported_kind.set_conditions_kind('UNIVERSAL_CONDITIONS')
        supported_kind.set_effects_kind('CONDITIONAL_EFFECTS')
        supported_kind.set_effects_kind('INCREASE_EFFECTS')
        supported_kind.set_effects_kind('DECREASE_EFFECTS')
        return supported_kind

    @staticmethod
    def supports(problem_kind):
        return problem_kind <= MySolverImpl.supported_kind()

    def _solve(self, problem: 'up.model.Problem',
              callback: Optional[Callable[['up.engines.PlanGenerationResult'], None]] = None,
              timeout: Optional[float] = None,
              output_stream: Optional[IO[str]] = None) -> 'up.engines.results.PlanGenerationResult':
        env = problem.environment

        # First we ground the problem
        with env.factory.Compiler(problem_kind=problem.kind, compilation_kind=engines.CompilationKind.GROUNDING) as grounder:
            grounding_result = grounder.compile(problem, engines.CompilationKind.GROUNDING)
        grounded_problem = grounding_result.problem

        # We store the grounded actions in a list
        actions = list(grounded_problem.instantaneous_actions)

        # The candidate plan, initially empty
        plan = up.plans.SequentialPlan([])

        # Ask for an instance of a PlanValidator by name
        # (`sequential_plan_validator` is a python implementation of the
        # PlanValidator operation mode offered by the UP library)
        with env.factory.PlanValidator(name='sequential_plan_validator') as pv:
            counter = 0
            while True:
                # With a certain probability, restart from scratch to avoid dead-ends
                if random.random() < self.restart_probability:
                    plan = up.plans.SequentialPlan()
                else:
                    # Select a random action
                    a = random.choice(actions)
                    # Create the relative action instance
                    ai = up.plans.ActionInstance(a)
                    # Append the action to the plan
                    plan.actions.append(ai)

                    # Check plan validity
                    res = pv.validate(grounded_problem, plan)
                    if res:
                        # If the plan is valid, lift the action instances and
                        # return the resulting plan
                        resplan = plan.replace_action_instances(grounding_result.map_back_action_instance)
                        # Sanity check
                        assert pv.validate(problem, resplan)
                        return up.engines.PlanGenerationResult(PlanGenerationResultStatus.SOLVED_SATISFICING, resplan, self.name)
                    else:
                        # If the plan is invalid, check if the reason is action
                        # applicability (as opposed to goal satisfaction)
                        einfo = res.log_messages[0].message
                        if 'Goals' not in einfo:
                            # If the plan is not executable, remove the last action
                            plan.actions.pop()
                    # Limit the number of tries, according to the user specification
                    counter += 1
                    if self.max_tries is not None and counter >= self.max_tries:
                        return up.engines.PlanGenerationResult(PlanGenerationResultStatus.TIMEOUT, None, self.name)

    def destroy(self):
        pass

Congratulations! You just created an integrated planning engine for the Oneshot operation mode! Of course, in a more realistic scenario, one would need to read the problem object and call an external engine or provide a more involved algorithm to provide the service, but this is already sufficient for testing out our simple engine.

12.3. Registering the engine

In order to use our YOLOPlanner, we need to register it among the set of planning engines available for the UP library as follows.

[5]:
env = up.environment.get_environment()
env.factory.add_engine('yoloplanner', __name__, 'MySolverImpl')

Essentially, we just need to give a custom name (in our case yoloplanner) a module name (in thic case, __name__ as we are in the same file as the Solver class) and finally the class name that we used to define our planning engine.

Done! We are nor ready to test our planning engine!

12.4. Testing the engine

We start by defining a simple problem in the UP syntax (alternatively we can use a parser or any other way to create a problem obejct)

[6]:
emgr = env.expression_manager

Location = env.type_manager.UserType('Location')
robot_at = up.model.Fluent('robot_at', env.type_manager.BoolType(), loc=Location)
battery_charge = up.model.Fluent('battery_charge', env.type_manager.RealType(0, 100))
move = up.model.InstantaneousAction('move', l_from=Location, l_to=Location)
l_from = move.parameter('l_from')
l_to = move.parameter('l_to')
move.add_precondition(emgr.GE(battery_charge, 10))
move.add_precondition(emgr.Not(emgr.Equals(l_from, l_to)))
move.add_precondition(robot_at(l_from))
move.add_precondition(emgr.Not(robot_at(l_to)))
move.add_effect(robot_at(l_from), False)
move.add_effect(robot_at(l_to), True)
move.add_effect(battery_charge, emgr.Minus(battery_charge, 10))
l1 = up.model.Object('l1', Location)
l2 = up.model.Object('l2', Location)
problem = up.model.Problem('robot')
problem.add_fluent(robot_at)
problem.add_fluent(battery_charge)
problem.add_action(move)
problem.add_object(l1)
problem.add_object(l2)
problem.set_initial_value(robot_at(l1), True)
problem.set_initial_value(robot_at(l2), False)
problem.set_initial_value(battery_charge, 100)
problem.add_goal(robot_at(l2))

Then, we invoke the operation mode as for any other planner in the UP library!

[7]:
with env.factory.OneshotPlanner(name='yoloplanner') as p:
    result = p.solve(problem)
    if result.status == PlanGenerationResultStatus.SOLVED_SATISFICING:
        print(f'{p.name} found a valid plan!')
        print(f'The plan is: {result.plan}')
        print('\n'.join(str(x) for x in result.plan.actions))
    else:
        print('No plan found!')
YOLOPlanner found a valid plan!
The plan is: [move(l2, l1)]
move(l2, l1)