# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module defines the Method class.
A Method has a name, a list of Parameters, a list of conditions
and a list of subtasks.
"""
from collections import OrderedDict
from typing import List, Union, Optional
import unified_planning as up
from unified_planning.environment import Environment
from unified_planning.exceptions import UPUnboundedVariablesError, UPValueError
from unified_planning.model.htn.task_network import AbstractTaskNetwork
from unified_planning.model.parameter import Parameter
from unified_planning.model.htn.task import Task
from unified_planning.model.expression import Expression
class ParameterizedTask:
"""A task instantiated with some parameters."""
def __init__(self, task: Task, *params: Parameter):
self._task = task
self._params: List[Parameter] = list(params)
assert len(self._task.parameters) == len(self._params)
# TODO #153: check that the type of each parameter is compatible with the task' signature
def __repr__(self):
return str(self._task.name) + "(" + ", ".join(map(str, self.parameters)) + ")"
def __eq__(self, other):
return (
isinstance(other, ParameterizedTask)
and self._task == other._task
and self._params == other._params
)
def __hash__(self):
return hash(self._task) + sum(map(hash, self._params))
@property
def task(self) -> Task:
return self._task
@property
def parameters(self) -> List["up.model.parameter.Parameter"]:
return self._params
[docs]
class Method(AbstractTaskNetwork):
"""HTN Method: encoding of a procedure for achieving a high-level task."""
def __init__(
self,
_name: str,
_parameters: Optional[
"Union[OrderedDict[str, up.model.types.Type], List[Parameter]]"
] = None,
_env: Optional[Environment] = None,
**kwargs: "up.model.types.Type",
):
super(Method, self).__init__(_env)
self._task: Optional[ParameterizedTask] = None
self._name = _name
self._parameters: "OrderedDict[str, Parameter]" = OrderedDict()
self._preconditions: List[up.model.fnode.FNode] = []
if _parameters is None:
for n, t in kwargs.items():
self._parameters[n] = Parameter(n, t, self._env)
elif isinstance(_parameters, List):
assert len(kwargs) == 0
for p in _parameters:
self._parameters[p.name] = p
else:
assert isinstance(_parameters, OrderedDict)
assert len(kwargs) == 0
for n, t in _parameters.items():
self._parameters[n] = Parameter(n, t, self._env)
def __repr__(self) -> str:
s = []
s.append(f"method {self.name}")
first = True
for p in self.parameters:
if first:
s.append("(")
first = False
else:
s.append(", ")
s.append(str(p))
if not first:
s.append(")")
s.append(" {\n")
s.append(f" task = {self._task}\n")
if len(self.preconditions) > 0:
s.append(" preconditions = [\n")
for c in self.preconditions:
s.append(f" {str(c)}\n")
s.append(" ]\n")
if len(self.constraints) > 0:
s.append(" constraints = [\n")
for c in self.constraints:
s.append(f" {str(c)}\n")
s.append(" ]\n")
if len(self.subtasks) > 0:
s.append(" subtasks = [\n")
for st in self.subtasks:
s.append(f" {str(st)}\n")
s.append(" ]\n")
s.append("}")
return "".join(s)
def __eq__(self, oth: object) -> bool:
if not isinstance(oth, Method):
return False
return (
self._env == oth._env
and self._name == oth._name
and self._parameters == oth._parameters
and self._task == oth._task
and set(self._preconditions) == set(oth._preconditions)
and set(self.subtasks) == set(oth.subtasks)
and set(self.constraints) == set(oth.constraints)
)
def __hash__(self) -> int:
res = hash(self._name)
res += hash(self._task)
res += sum(map(hash, self.parameters))
res += sum(map(hash, self._preconditions))
res += sum(map(hash, self.constraints))
res += sum(map(hash, self.subtasks))
return res
@property
def name(self) -> str:
"""Returns the action name."""
return self._name
@property
def achieved_task(self) -> ParameterizedTask:
"""Returns the task that this method achieves."""
assert (
self._task is not None
), "The achieved task was previously set (see the set_task method)."
return self._task
[docs]
def set_task(self, task: Union[Task, ParameterizedTask], *arguments: Parameter):
"""Defines the task that is method achieves.
It expects a Task and its arguments, either bundle in a `ParameterizedTask` instance of
passed separetly.
It is assumed that each parameter of the achieved task is a parameter of the method.
# Examples
>>> from unified_planning.shortcuts import *
>>> from unified_planning.model.htn import *
>>> Location = UserType("Location")
>>> go = Task("go", target=Location)
>>> m1 = Method("m-go1", target=Location)
>>> task_achieved = ParameterizedTask(go, m1.parameter("target"))
>>> m1.set_task(task_achieved)
>>> m2 = Method("m-go2", source=Location, target=Location)
>>> m2.set_task(go, m2.parameter("target"))
>>> m3 = Method("m-go3", source=Location, target=Location)
>>> m3.set_task(go) # Infer the parameters of the `go` task from the parameters of m3 with the same name
"""
assert self._task is None, f"Method {self.name} was already assigned a task"
if isinstance(task, ParameterizedTask):
assert (
len(arguments) == 0
), "Unexpected arguments passed along a ParameterizedTask"
assert all(
p in self.parameters for p in task.parameters
), "A parameter of the task does not appear as a parameter of the method."
self._task = task
elif isinstance(task, Task) and len(arguments) == 0:
for task_param in task.parameters:
assert (
task_param.name in self._parameters
), f"Missing task parameter '{task_param.name}' in method {self._name}. Please pass all parameters explicitly."
self._task = ParameterizedTask(task, *task.parameters)
else:
assert all(
p in self.parameters for p in arguments
), "An argument passed to the task does not appear as a parameter of the method."
self._task = ParameterizedTask(task, *arguments)
@property
def parameters(self) -> List[Parameter]:
"""Returns the list of the method's parameters."""
return list(self._parameters.values())
[docs]
def parameter(self, name: str) -> Parameter:
"""
Returns the `parameter` of the `Method` with the given `name`.
Example
-------
>>> from unified_planning.shortcuts import *
>>> from unified_planning.model.htn import *
>>> location_type = UserType("Location")
>>> robot_type = UserType("Robot")
>>> goto = Method("goto", robot=robot_type, target=location_type)
>>> goto.parameter("robot") # return the "robot" parameter of the method, with type "Robot"
Robot robot
>>> goto.parameter("target")
Location target
If a parameter's name (1) does not conflict with an existing attribute of `Method` and (2) does not start with '_'
it can also be accessed as if it was an attribute of the method. For instance:
>>> goto.target
Location target
:param name: The `name` of the target `parameter`.
:return: The `parameter` of the `Method` with the given `name`.
"""
for param in self.parameters:
if param.name == name:
return param
raise UPValueError(f"Unknown parameter name: {name}")
def __getattr__(self, parameter_name: str) -> "up.model.parameter.Parameter":
if parameter_name.startswith("_"):
# guard access as pickling relies on attribute error to be thrown even when
# no attributes of the object have been set.
# In this case accessing `self._name` or `self._parameters`, would re-invoke __getattr__
raise AttributeError(f"Method has no attribute '{parameter_name}'")
if parameter_name not in self._parameters:
raise AttributeError(
f"Method '{self.name}' has no attribute or parameter '{parameter_name}'"
)
return self._parameters[parameter_name]
@property
def preconditions(self) -> List["up.model.fnode.FNode"]:
"""Returns the list of the method's preconditions."""
return self._preconditions
[docs]
def add_precondition(self, precondition: Expression):
"""Adds the given method precondition."""
(precondition_exp,) = self._env.expression_manager.auto_promote(precondition)
assert self._env.type_checker.get_type(precondition_exp).is_bool_type()
if precondition_exp == self._env.expression_manager.TRUE():
return
free_vars = self._env.free_vars_oracle.get_free_variables(precondition_exp)
if len(free_vars) != 0:
raise UPUnboundedVariablesError(
f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}"
)
if precondition_exp not in self._preconditions:
self._preconditions.append(precondition_exp)