Source code for unified_planning.model.htn.task

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module defines the Task class.
A Task has a name and a signature that defines the types of its parameters.
"""

import unified_planning as up
from unified_planning.environment import get_environment, Environment
from typing import List, OrderedDict, Optional, Union
from unified_planning.model.fnode import FNode
from unified_planning.model.action import Action
from unified_planning.model.timing import Timepoint, TimepointKind
from unified_planning.model.types import Type
from unified_planning.model.expression import Expression
from unified_planning.model.parameter import Parameter


[docs] class Task: """Represents an abstract task.""" def __init__( self, name: str, _parameters: Optional[Union[OrderedDict[str, Type], List[Parameter]]] = None, _env: Optional[Environment] = None, **kwargs: Type, ): self._env = get_environment(_env) self._name = name self._parameters: List[Parameter] = [] if _parameters is not None: assert len(kwargs) == 0 if isinstance(_parameters, OrderedDict): for param_name, param_type in _parameters.items(): self._parameters.append( up.model.parameter.Parameter(param_name, param_type, self._env) ) elif isinstance(_parameters, List): self._parameters = _parameters[:] else: raise NotImplementedError else: for param_name, param_type in kwargs.items(): self._parameters.append( up.model.parameter.Parameter(param_name, param_type, self._env) ) def __repr__(self) -> str: sign = "" if len(self.parameters) > 0: sign_items = [f"{p.name}={str(p.type)}" for p in self.parameters] sign = f'[{", ".join(sign_items)}]' return f"{self.name}{sign}" def __eq__(self, oth: object) -> bool: if isinstance(oth, Task): return ( self._name == oth._name and self._parameters == oth._parameters and self._env == oth._env ) else: return False def __hash__(self) -> int: return hash(self._name) + sum(map(hash, self._parameters)) @property def name(self) -> str: """Returns the task's name.""" return self._name @property def parameters(self) -> List[Parameter]: """Returns the task's parameters as a list.""" return self._parameters def __call__(self, *args: Expression, ident: Optional[str] = None) -> "Subtask": """Returns a subtask with the given parameters.""" return Subtask(self, *self._env.expression_manager.auto_promote(args))
# global counter to enable the creation of unique identifiers. _task_id_counter = 0 class Subtask: def __init__( self, _task: Union[Action, Task], *args: Expression, ident: Optional[str] = None, _env: Optional[Environment] = None, ): self._env = get_environment(_env) self._task = _task self._ident: str if ident is not None: self._ident = ident else: # we have to create an unambiguous identifier as there might otherwise identical tasks global _task_id_counter _task_id_counter += 1 self._ident = f"_t{_task_id_counter}" self._args = self._env.expression_manager.auto_promote(*args) if len(self._args) != len(self._task.parameters): raise ValueError( f"Wrong number of arguments. Expected: {self._task.parameters}. Provided: {self._args}" ) def __repr__(self): params = ", ".join([str(a) for a in self._args]) return f"{self.identifier}: {self._task.name}({params})" def __eq__(self, other): if not isinstance(other, Subtask): return False return ( self._env == other._env and self._ident == other._ident and self._task == other._task and self._args == other._args ) def __hash__(self): return hash(self._ident) + hash(self._task) + sum(map(hash, self._args)) @property def task(self) -> Union[Task, Action]: return self._task @property def parameters(self) -> List["FNode"]: return self._args @property def identifier(self) -> str: """Unique identifier of the subtask in its task network.""" return self._ident @property def start(self) -> Timepoint: """Timepoint representing the task's end time.""" return Timepoint(TimepointKind.START, container=self.identifier) @property def end(self) -> Timepoint: """Timepoint representing the task's end time.""" return Timepoint(TimepointKind.END, container=self.identifier)