Source code for unified_planning.plans.hierarchical_plan

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from dataclasses import dataclass, field
from typing import Union, Dict, Optional, Tuple, List, OrderedDict, Callable

from unified_planning.model.abstract_problem import AbstractProblem

from unified_planning.exceptions import UPUsageError
from unified_planning.model.fnode import FNode
from unified_planning.plans.time_triggered_plan import TimeTriggeredPlan
from unified_planning.plans.sequential_plan import SequentialPlan
from unified_planning.plans.plan import Plan, PlanKind
from unified_planning.model.htn import Method
from unified_planning.plans.plan import ActionInstance


FlatPlan = Union[SequentialPlan, TimeTriggeredPlan]


@dataclass
class Decomposition:
    """A decomposition associates each of a set of task identifiers to either a method or an action that achieves it."""

    subtasks: Dict[str, Union["MethodInstance", ActionInstance]] = field(
        default_factory=dict
    )

    def __repr__(self):
        s = []
        for id, dec in self.subtasks.items():
            s.append(id)
            s.append(" -> ")
            s.append(str(dec).replace("\n", "\n  "))  # indent
            s.append("\n")
        if len(s) > 0:
            s.pop(-1)  # remove last line break
        return "".join(s)

    def __eq__(self, other):
        return self is other

    def __hash__(self):
        return sum(map(hash, self.subtasks.keys()))

    def is_semantically_equivalent(self, other: "Decomposition") -> bool:
        if set(self.subtasks.keys()) != set(other.subtasks.keys()):
            return False
        for k in self.subtasks:
            my = self.subtasks[k]
            ot = other.subtasks[k]
            if isinstance(my, MethodInstance):
                if not isinstance(
                    ot, MethodInstance
                ) or not my.is_semantically_equivalent(ot):
                    return False
            elif isinstance(my, ActionInstance):
                if not isinstance(
                    ot, ActionInstance
                ) or not my.is_semantically_equivalent(ot):
                    return False
            else:
                raise ValueError
        return True

    def _accumulate_instances(
        self,
        id_prefix: str,
        out: List[Tuple[str, Union["MethodInstance", ActionInstance]]],
    ):
        for id, dec in self.subtasks.items():
            if isinstance(dec, ActionInstance):
                out.append((f"{id_prefix}{id}", dec))
            else:
                assert isinstance(dec, MethodInstance)
                method_id = f"{id_prefix}{id}::{dec.method.name}"
                out.append((method_id, dec))
                dec.decomposition._accumulate_instances(method_id + "::", out)

    def _replace_action_instances(
        self, replace_function: Callable[[ActionInstance], Optional[ActionInstance]]
    ) -> "Decomposition":
        def replace(instance):
            if isinstance(instance, ActionInstance):
                rep = replace_function(instance)
                if rep is None:
                    raise UPUsageError(
                        "Cannot remove an action from a hierarchical plan"
                    )
                return rep
            else:
                assert isinstance(instance, MethodInstance)
                return instance._replace_action_instances(replace_function)

        return Decomposition(
            {task: replace(dec) for task, dec in self.subtasks.items()}
        )


@dataclass
class MethodInstance:
    """An instantiation of a method, including its parameter and a decomposition for each of its subtasks."""

    method: Method
    parameters: Tuple[FNode, ...]
    decomposition: "Decomposition" = Decomposition()

    def __repr__(self):
        return f"{self.method.name}{self.parameters}\n{self.decomposition}"

    def __eq__(self, other):
        return self is other

    def __hash__(self):
        return hash(self.method) + sum(map(hash, self.parameters))

    def is_semantically_equivalent(self, other: "MethodInstance") -> bool:
        return (
            self.method == other.method
            and self.parameters == other.parameters
            and self.decomposition.is_semantically_equivalent(other.decomposition)
        )

    def _replace_action_instances(
        self, replace_function: Callable[[ActionInstance], Optional[ActionInstance]]
    ) -> "MethodInstance":
        return MethodInstance(
            self.method,
            self.parameters,
            self.decomposition._replace_action_instances(replace_function),
        )


[docs] class HierarchicalPlan(Plan): """A `HierarchicalPlan` represents a solution to a `HierarchicalProblem`. It provides the combination of a "flat plan" a set of action with ordering information, and for each task in the initial task network, its decomposition into methods and primitive actions. """ def __init__(self, flat_plan: FlatPlan, decomposition: Decomposition): super().__init__(PlanKind.HIERARCHICAL_PLAN) self._flat_plan = flat_plan self._decomposition = decomposition def __eq__(self, other): return ( isinstance(other, HierarchicalPlan) and self.action_plan == other.action_plan and self.decomposition.is_semantically_equivalent(other.decomposition) ) def __hash__(self): return hash(self._flat_plan) + hash(self._decomposition) def __repr__(self): return ( f"# Action Plan\n{self.action_plan}\n# Decomposition\n{self.decomposition}" ) def __str__(self) -> str: return "Hierarchical " + str(self.action_plan) @property def decomposition(self) -> Decomposition: """The decomposition of the initial task network.""" return self._decomposition @property def action_plan(self) -> FlatPlan: """A flat plan that contains hierarchical information.""" return self._flat_plan def _instances(self) -> List[Tuple[str, Union["MethodInstance", ActionInstance]]]: out: List[Tuple[str, Union["MethodInstance", ActionInstance]]] = [] self.decomposition._accumulate_instances("", out) return out
[docs] def actions(self) -> List[Tuple[str, ActionInstance]]: """Returns a list of all actions in the plan, together with a unique and stable identifier.""" return list( filter(lambda ins: isinstance(ins[1], ActionInstance), self._instances()) # type: ignore[arg-type] )
[docs] def methods(self) -> List[Tuple[str, MethodInstance]]: """Returns a list of all methods in the plan, together with a unique and stable identifier.""" return list( filter(lambda ins: isinstance(ins[1], MethodInstance), self._instances()) # type: ignore[arg-type] )
[docs] def replace_action_instances( self, replace_function: Callable[[ActionInstance], Optional[ActionInstance]] ) -> "Plan": decomposition = self._decomposition._replace_action_instances(replace_function) flat_plan = self._flat_plan.replace_action_instances(replace_function) assert isinstance(flat_plan, (SequentialPlan, TimeTriggeredPlan)) return HierarchicalPlan(flat_plan, decomposition)
[docs] def convert_to(self, plan_kind: PlanKind, problem: AbstractProblem) -> "Plan": if plan_kind == PlanKind.HIERARCHICAL_PLAN: return self elif plan_kind in [PlanKind.SEQUENTIAL_PLAN, PlanKind.TIME_TRIGGERED_PLAN]: # NOTE: we cannot rely on automatic conversion to PARTIAL_ORDER_PLAN or STN_PLAN # because, the hierarchy induces constraints on action orders that would not be accounted for # by translators only aware of the flat structure return self._flat_plan.convert_to(plan_kind, problem) else: raise NotImplementedError( f"Unavailable conversion from hierarchical plan to {plan_kind.name}" )