Planner Integration
In this simple demo we will create a new planning engine for the “Oneshot” operation mode, we will register the new planner in the UP library and we will test it on a simple problem.
NOTE: The unified planning library has auxiliary base classes for integrating stand-alone PDDL planners (not described in this notebook). They take care of writing the PDDL files, calling your planner and collecting the plan from disk on various operating systems. For oneshot planning, use engines.PDDLPlanner. For anytime planning there is engines.PDDLAnytimePlanner.
Setup
We start by installing the library with PIP
[ ]:
%pip install unified-planning
Define the planning engine
A planning engine is a class inheriting from the unified_planning.engines.Engine
class. The class shall inheriting also from at least one of the mixins class in unified_planning.engines.mixins
.
Custom options can be passed as keyword arguments in the constructor and each planning engine shall have a name specified by implementing the name()
method.
In order for the UP library to filter applicable engines, the supports
method must be implemented. The method shall return true if the engine is capable of dealing with the given ProblemKind
, that is a class storing information on the problem class (much like the :requiremnets
specification of PDDL).
In this example, we create a simple OneshotPlanner
(see unified_planning.engines.factory
for details and other operation modes). A OneshotPlanner
is an instance of unified_planning.engines.Engine
and unified_planning.engines.mixins.OneshotPlannerMixin
that implements the solve()
method returning a up.engines.results.PlanGenerationResult
that contains plan in the form of an instance (or subclass) of unified_planning.plan.Plan
if a plan could be found. In the code
below, we implement a simple random walker that uses a PlanValidator
(another operation mode) as a way to check if prefixes of plans are valid.
[2]:
import random
from typing import Callable, IO, Optional
import unified_planning as up
from unified_planning import engines
class MySolverImpl(up.engines.Engine,
up.engines.mixins.OneshotPlannerMixin):
def __init__(self, **options):
# Read known user-options and store them for using in the `solve` method
up.engines.Engine.__init__(self)
up.engines.mixins.OneshotPlannerMixin.__init__(self)
self.max_tries = options.get('max_tries', None)
self.restart_probability = options.get('restart_probability', 0.00001)
@property
def name(self) -> str:
return "YOLOPlanner"
@staticmethod
def supported_kind():
# For this demo we limit ourselves to numeric planning.
# Other kinds of problems can be modeled in the UP library,
# see unified_planning.model.problem_kind.
supported_kind = up.model.ProblemKind()
supported_kind.set_problem_class("ACTION_BASED")
supported_kind.set_problem_type("GENERAL_NUMERIC_PLANNING")
supported_kind.set_typing('FLAT_TYPING')
supported_kind.set_typing('HIERARCHICAL_TYPING')
supported_kind.set_numbers('CONTINUOUS_NUMBERS')
supported_kind.set_numbers('DISCRETE_NUMBERS')
supported_kind.set_fluents_type('NUMERIC_FLUENTS')
supported_kind.set_numbers('BOUNDED_TYPES')
supported_kind.set_fluents_type('OBJECT_FLUENTS')
supported_kind.set_conditions_kind('NEGATIVE_CONDITIONS')
supported_kind.set_conditions_kind('DISJUNCTIVE_CONDITIONS')
supported_kind.set_conditions_kind('EQUALITIES')
supported_kind.set_conditions_kind('EXISTENTIAL_CONDITIONS')
supported_kind.set_conditions_kind('UNIVERSAL_CONDITIONS')
supported_kind.set_effects_kind('CONDITIONAL_EFFECTS')
supported_kind.set_effects_kind('INCREASE_EFFECTS')
supported_kind.set_effects_kind('DECREASE_EFFECTS')
supported_kind.set_effects_kind('FLUENTS_IN_NUMERIC_ASSIGNMENTS')
return supported_kind
@staticmethod
def supports(problem_kind):
return problem_kind <= MySolverImpl.supported_kind()
def _solve(self, problem: 'up.model.Problem',
callback: Optional[Callable[['up.engines.PlanGenerationResult'], None]] = None,
timeout: Optional[float] = None,
output_stream: Optional[IO[str]] = None) -> 'up.engines.PlanGenerationResult':
env = problem.environment
# First we ground the problem
with env.factory.Compiler(problem_kind=problem.kind, compilation_kind=up.engines.CompilationKind.GROUNDING) as grounder:
grounding_result = grounder.compile(problem, up.engines.CompilationKind.GROUNDING)
grounded_problem = grounding_result.problem
# We store the grounded actions in a list
actions = list(grounded_problem.instantaneous_actions)
# The candidate plan, initially empty
plan = up.plans.SequentialPlan([])
# Ask for an instance of a PlanValidator by name
# (`sequential_plan_validator` is a python implementation of the
# PlanValidator operation mode offered by the UP library)
with env.factory.PlanValidator(name='sequential_plan_validator') as pv:
counter = 0
while True:
# With a certain probability, restart from scratch to avoid dead-ends
if random.random() < self.restart_probability:
plan = up.plans.SequentialPlan()
else:
# Select a random action
a = random.choice(actions)
# Create the relative action instance
ai = up.plans.ActionInstance(a)
# Append the action to the plan
plan.actions.append(ai)
# Check plan validity
res = pv.validate(grounded_problem, plan)
if res:
# If the plan is valid, lift the action instances and
# return the resulting plan
resplan = plan.replace_action_instances(grounding_result.map_back_action_instance)
# Sanity check
assert pv.validate(problem, resplan)
status = up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING
return up.engines.PlanGenerationResult(status, resplan, self.name)
else:
# If the plan is invalid, check if the reason is action
# applicability (as opposed to goal satisfaction)
einfo = res.log_messages[0].message
if 'Goals' not in einfo:
# If the plan is not executable, remove the last action
plan.actions.pop()
# Limit the number of tries, according to the user specification
counter += 1
if self.max_tries is not None and counter >= self.max_tries:
status = up.engines.PlanGenerationResultStatus.TIMEOUT
return up.engines.PlanGenerationResult(status, None, self.name)
def destroy(self):
pass
Congratulations! You just created an integrated planning engine for the Oneshot
operation mode! Of course, in a more realistic scenario, one would need to read the problem
object and call an external engine or provide a more involved algorithm to provide the service, but this is already sufficient for testing out our simple engine.
Registering the engine
In order to use our YOLOPlanner
, we need to register it among the set of planning engines available for the UP library as follows.
[3]:
env = up.environment.get_environment()
env.factory.add_engine('yoloplanner', __name__, 'MySolverImpl')
Essentially, we just need to give a custom name (in our case yoloplanner
) a module name (in thic case, __name__
as we are in the same file as the Solver class) and finally the class name that we used to define our planning engine.
Done! We are now ready to test our planning engine!
Testing the engine
We start by defining a simple problem in the UP syntax (alternatively we can use a parser or any other way to create a problem obejct)
[4]:
from unified_planning.shortcuts import *
problem = Problem('robot')
Location = UserType('Location')
robot_at = problem.add_fluent('robot_at', BoolType(), loc=Location)
battery_charge = problem.add_fluent('battery_charge', RealType(0, 100))
move = InstantaneousAction('move', l_from=Location, l_to=Location)
l_from = move.l_from
l_to = move.l_to
move.add_precondition(battery_charge >= 10)
move.add_precondition(Not(Equals(l_from, l_to)))
move.add_precondition(robot_at(l_from))
move.add_effect(robot_at(l_from), False)
move.add_effect(robot_at(l_to), True)
move.add_effect(battery_charge, battery_charge - 10)
problem.add_action(move)
l1 = problem.add_object('l1', Location)
l2 = problem.add_object('l2', Location)
problem.set_initial_value(robot_at(l1), True)
problem.set_initial_value(robot_at(l2), False)
problem.set_initial_value(battery_charge, 100)
problem.add_goal(robot_at(l2))
Then, we invoke the operation mode as for any other planner in the UP library!
[5]:
with OneshotPlanner(name='yoloplanner', params = {'max_tries' : 5}) as p:
result = p.solve(problem)
if result.status == up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING:
print(f'{p.name} found a valid plan!')
print(result.plan)
else:
print('No plan found!')
YOLOPlanner found a valid plan!
SequentialPlan:
move(l1, l2)