Source code for unified_planning.model.htn.hierarchical_problem

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections import OrderedDict
from typing import Optional, List, Union, Dict, Set
from warnings import warn

import unified_planning as up
from unified_planning.model.expression import ConstantExpression
from unified_planning.model.htn.method import Method
from unified_planning.model.htn.task import Task
from unified_planning.model.htn.task_network import TaskNetwork, AbstractTaskNetwork
from unified_planning.exceptions import UPProblemDefinitionError


[docs] class HierarchicalProblem(up.model.problem.Problem): def __init__( self, name: Optional[str] = None, environment: Optional["up.environment.Environment"] = None, *, initial_defaults: Dict[ "up.model.types.Type", ConstantExpression, ] = {}, ): super().__init__( name=name, environment=environment, initial_defaults=initial_defaults ) self._abstract_tasks: OrderedDict[str, Task] = OrderedDict() self._methods: OrderedDict[str, Method] = OrderedDict() self._initial_task_network = TaskNetwork() def __repr__(self): s = [super().__repr__()] s.append("abstract tasks = [\n") for t in self._abstract_tasks.values(): s.append(f" {t}\n") s.append("]\n\n") s.append("methods = [") for m in self._methods.values(): s.append(("\n" + str(m)).replace("\n", "\n ")) s.append("\n]\n\n") s.append(str(self._initial_task_network)) return "".join(s) def __eq__(self, oth: object) -> bool: if not super().__eq__(oth): return False if not isinstance(oth, HierarchicalProblem): return False return ( self._initial_task_network == oth._initial_task_network and self._methods == oth._methods and self._abstract_tasks == oth._abstract_tasks ) def __hash__(self): res = super().__hash__() res += sum(map(hash, self._abstract_tasks.values())) res += sum(map(hash, self._methods.values())) res += hash(self._initial_task_network) return res
[docs] def clone(self): new_p = HierarchicalProblem(self._name, self._env) new_p._fluents = self._fluents[:] new_p._actions = [a.clone() for a in self._actions] new_p._user_types = self._user_types[:] new_p._user_types_hierarchy = self._user_types_hierarchy.copy() new_p._objects = self._objects[:] new_p._initial_value = self._initial_value.copy() new_p._timed_effects = { t: [e.clone() for e in el] for t, el in self._timed_effects.items() } new_p._timed_goals = {i: [g for g in gl] for i, gl in self._timed_goals.items()} new_p._goals = self._goals[:] new_p._metrics = [] for m in self._metrics: if m.is_minimize_action_costs(): assert isinstance(m, up.model.metrics.MinimizeActionCosts) costs: Dict["up.model.Action", "up.model.Expression"] = { new_p.action(a.name): c for a, c in m.costs.items() } new_p._metrics.append(up.model.metrics.MinimizeActionCosts(costs)) else: new_p._metrics.append(m) new_p._initial_defaults = self._initial_defaults.copy() new_p._fluents_defaults = self._fluents_defaults.copy() new_p._initial_task_network = self._initial_task_network.clone() new_p._methods = self._methods.copy() new_p._abstract_tasks = self._abstract_tasks.copy() return new_p
[docs] def get_unused_fluents(self): """ Returns the set of `fluents` that are never used in the problem. """ # from our parents unused fluents, remove all those that appear in methods preconditions and constraints unused_fluents: Set["up.model.fluent.Fluent"] = super().get_unused_fluents() fve = self._env.free_vars_extractor # function that takes an FNode and removes all the fluents contained in the given FNode # from the unused_fluents set. remove_used_fluents = lambda *exps: unused_fluents.difference_update( (f.fluent() for e in exps for f in fve.get(e)) ) for m in self.methods: remove_used_fluents(*m.preconditions) remove_used_fluents(*m.constraints) remove_used_fluents(*self.task_network.constraints) return unused_fluents
@property def kind(self) -> "up.model.problem_kind.ProblemKind": """Returns the problem kind of this planning problem. IMPORTANT NOTE: this property does a lot of computation, so it should be called as minimum time as possible.""" factory = self._kind_factory() factory.kind.set_problem_class("HIERARCHICAL") factory.kind.unset_problem_class("ACTION_BASED") (TO, PO, TEMPORAL) = (0, 1, 2) def lvl(tn: AbstractTaskNetwork): """Determines the expressivity level of temporal constraints within a task network""" if tn.total_order() is not None: return TO elif tn.partial_order() is not None: return PO else: return TEMPORAL ordering_kind = lvl(self.task_network) if len(self.task_network.variables) > 0: factory.kind.set_hierarchical("INITIAL_TASK_NETWORK_VARIABLES") if len(self.task_network.non_temporal_constraints()) > 0: factory.kind.set_hierarchical("TASK_NETWORK_CONSTRAINTS") for method in self.methods: ordering_kind = max(ordering_kind, lvl(method)) for method_cond in method.preconditions: factory.kind.set_hierarchical("METHOD_PRECONDITIONS") factory.update_problem_kind_expression(method_cond) if len(method.non_temporal_constraints()) > 0: factory.kind.set_hierarchical("TASK_NETWORK_CONSTRAINTS") if ordering_kind == TO: factory.kind.set_hierarchical("TASK_ORDER_TOTAL") elif ordering_kind == PO: factory.kind.set_hierarchical("TASK_ORDER_PARTIAL") else: factory.kind.set_hierarchical("TASK_ORDER_TEMPORAL") factory.kind.set_time("CONTINUOUS_TIME") return factory.finalize()
[docs] def has_name(self, name: str) -> bool: """ Returns `True` if the given `name` is already in the `HierarchicalProblem`, `False` otherwise. :param name: The target name to find in the `HierarchicalProblem`. :return: `True` if the given `name` is already in the `HierarchicalProblem`, `False` otherwise. """ return ( self.has_action(name) or self.has_fluent(name) or self.has_object(name) or self.has_type(name) or self.has_task(name) or name in self._methods )
@property def tasks(self) -> List[Task]: return list(self._abstract_tasks.values())
[docs] def get_task(self, task_name: str) -> Task: return self._abstract_tasks[task_name]
[docs] def has_task(self, task_name: str): return task_name in self._abstract_tasks
[docs] def add_task(self, task: Union[Task, str], **kwargs: "up.model.types.Type") -> Task: if isinstance(task, str): task = Task(task, _parameters=OrderedDict(**kwargs)) else: assert len(kwargs) == 0 if self.has_name(task.name): msg = f"Name of task {task.name} already defined! Different elements of a problem can have the same name if the environment flag error_used_name is disabled." if self._env.error_used_name or any( task.name == t for t in self._abstract_tasks ): raise UPProblemDefinitionError(msg) else: warn(msg) self._abstract_tasks[task.name] = task for param in task.parameters: if param.type.is_user_type(): self._add_user_type(param.type) return task
@property def methods(self) -> List[Method]: return list(self._methods.values())
[docs] def method(self, method_name) -> Method: return self._methods[method_name]
[docs] def add_method(self, method: Method): assert ( method.achieved_task is not None ), f"No achieved task was specified for this method." if self.has_name(method.name): msg = f"Name of method {method.name} already defined! Different elements of a problem can have the same name if the environment flag error_used_name is disabled." if self._env.error_used_name or any( method.name == m for m in self._methods ): raise UPProblemDefinitionError(msg) else: warn(msg) assert ( method.achieved_task.task.name in self._abstract_tasks ), f"Method is associated to an unregistered task '{method.achieved_task.task.name}'" self._methods[method.name] = method for param in method.parameters: if param.type.is_user_type(): self._add_user_type(param.type)
@property def task_network(self): return self._initial_task_network