Source code for unified_planning.plans.sequential_plan

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import networkx as nx
import unified_planning as up
import unified_planning.plans as plans
import unified_planning.model.walkers as walkers
from unified_planning.environment import Environment
from unified_planning.exceptions import UPUsageError
from unified_planning.model import FNode, InstantaneousAction, Expression
from typing import Callable, Dict, Optional, Set, List, cast


[docs] class SequentialPlan(plans.plan.Plan): """Represents a sequential plan.""" def __init__( self, actions: List["plans.plan.ActionInstance"], environment: Optional["Environment"] = None, ): # if we have a specific environment or we don't have any actions if environment is not None or not actions: plans.plan.Plan.__init__( self, plans.plan.PlanKind.SEQUENTIAL_PLAN, environment ) # If we don't have a specific environment and have at least 1 action, use the environment of the first action else: assert len(actions) > 0 plans.plan.Plan.__init__( self, plans.plan.PlanKind.SEQUENTIAL_PLAN, actions[0].action.environment ) for ( ai ) in ( actions ): # check that given environment and the environment in the actions is the same if ai.action.environment != self._environment: raise UPUsageError( "The environment given to the plan is not the same of the actions in the plan." ) self._actions = actions def __str__(self) -> str: ret = ["SequentialPlan:"] convert_ai = lambda ai: f" {ai}" ret.extend(map(convert_ai, self._actions)) return "\n".join(ret) def __repr__(self) -> str: return f"SequentialPlan({self._actions})" def __eq__(self, oth: object) -> bool: if isinstance(oth, SequentialPlan) and len(self._actions) == len(oth._actions): for ai, oth_ai in zip(self._actions, oth._actions): if not ai.is_semantically_equivalent(oth_ai): return False return True else: return False def __hash__(self) -> int: count: int = 0 for i, ai in enumerate(self._actions): count += i + hash(ai.action) + hash(ai.actual_parameters) return count def __contains__(self, item: object) -> bool: if isinstance(item, plans.plan.ActionInstance): return any(item.is_semantically_equivalent(a) for a in self._actions) else: return False @property def actions(self) -> List["plans.plan.ActionInstance"]: """Returns the sequence of `ActionInstances`.""" return self._actions
[docs] def replace_action_instances( self, replace_function: Callable[ ["plans.plan.ActionInstance"], Optional["plans.plan.ActionInstance"] ], ) -> "up.plans.plan.Plan": """ Returns a new `SequentialPlan` where every `ActionInstance` of the current `Plan` is replaced using the given function. :param replace_function: The function that applied to an `ActionInstance A` returns the `ActionInstance B`; `B` replaces `A` in the resulting `SequentialPlan`. :return: The `SequentialPlan` where every `ActionInstance` is replaced using the given `replace_function`. """ new_ai = [] for ai in self._actions: replaced_ai = replace_function(ai) if replaced_ai is not None: new_ai.append(replaced_ai) new_env = self._environment if len(new_ai) > 0: new_env = new_ai[0].action.environment return SequentialPlan(new_ai, new_env)
def _to_partial_order_plan( self, problem: "up.model.mixins.ObjectsSetMixin" ) -> "up.plans.partial_order_plan.PartialOrderPlan": """ Returns the `PartialOrderPlan` version of this `SequentialPlan`. This is done by keeping the ordering constraints, given by the `SequentialPlan`, between 2 `ActionInstances` that satisfy one of these conditions: - at least one of the 2 `ActionInstances` writes on a :class:`grounded fluent <unified_planning.model.Fluent>` (writes means that one of his :class:`Effects <unified_planning.model.Effect> assign a value to said `fluent`) - `AND` the other `ActionInstance` reads or writes on the same `grounded fluent` (reads means that one of his preconditions or one of his condition in a conditional effect depends on said fluent). :param problem: The `problem` for which this `SequentialPlan` is created. :return: A `PartialOrderPlan` compatible with the given `problem`. """ subs = self._environment.substituter simp = self._environment.simplifier eqr = walkers.ExpressionQuantifiersRemover(self._environment) fve = self._environment.free_vars_extractor # last_modifier is the mapping from a grounded fluent to the last action instance that assigned a value to # that fluent last_modifier: Dict[FNode, "plans.plan.ActionInstance"] = {} # all_required is the mapping from a grounded fluent to all the action instances that read the value of that # fluent in their preconditions (or in the condition of their conditional effects) all_required: Dict[FNode, List["plans.plan.ActionInstance"]] = {} # graph stores the information gathered through the process graph = nx.DiGraph() for action_instance in self.actions: graph.add_node(action_instance) assert isinstance(action_instance.action, InstantaneousAction) inst_action = cast(InstantaneousAction, action_instance.action) # required_fluents contains all the grounded fluents that this action_instance "reads" required_fluents: Set[FNode] = set() # same of required_fluents, but the fluents are lifted lifted_required_fluents: Set[FNode] = set() # add free vars of preconditions for prec in inst_action.preconditions: lifted_required_fluents |= fve.get( eqr.remove_quantifiers(prec, problem) ) # add in the required_fluents all the free fluents this action instance deals with for effect in inst_action.effects: for eff in effect.expand_effect(problem): lifted_required_fluents |= fve.get( eqr.remove_quantifiers(eff.condition, problem) ) lifted_required_fluents |= fve.get( eqr.remove_quantifiers(eff.fluent, problem) ) lifted_required_fluents |= fve.get( eqr.remove_quantifiers(eff.value, problem) ) assignments: Dict[Expression, Expression] = dict( zip(inst_action.parameters, action_instance.actual_parameters) ) for lifted_fluent in lifted_required_fluents: assert lifted_fluent.is_fluent_exp() for ( arg ) in lifted_fluent.args: # check that we don't have "nested" fluents if len(fve.get(eqr.remove_quantifiers(arg, problem))) != 0: raise UPUsageError( f"The partial deordering of a Sequential Plan does not allow the use of fluents inside the parameter of fluents!\nThe fluent: {lifted_fluent} does violates this contraint." ) required_fluents.add( simp.simplify(subs.substitute(lifted_fluent, assignments)) ) # for every required fluent, add this action instance to the list of action instances that requires this fluent # and order the current action instance after the last modifier of the fluent for required_fluent in required_fluents: action_instance_list = all_required.setdefault(required_fluent, []) action_instance_list.append(action_instance) required_fluent_last_modifier = last_modifier.get(required_fluent, None) if required_fluent_last_modifier is not None: assert ( required_fluent_last_modifier != action_instance ) # sanity check graph.add_edge(required_fluent_last_modifier, action_instance) # for every effect, set current action instance as the last modifier and the current action instance is ordered # after every action instance that requires a fluent the current action instance modifies for effect in inst_action.effects: for eff in effect.expand_effect(problem): assert eff.fluent.is_fluent_exp() grounded_fluent = simp.simplify( subs.substitute(eff.fluent, assignments) ) last_modifier[grounded_fluent] = action_instance dependent_action_instance_list = all_required.setdefault( grounded_fluent, [] ) for dependent_action_instance in dependent_action_instance_list: if dependent_action_instance != action_instance: graph.add_edge(dependent_action_instance, action_instance) assert nx.is_directed_acyclic_graph(graph) # remove redundant edges and return the up.plans.partial_order_plan.PartialOrderPlan structure. return up.plans.partial_order_plan.PartialOrderPlan( {}, self._environment, nx.transitive_reduction(graph) )
[docs] def convert_to( self, plan_kind: "plans.plan.PlanKind", problem: "up.model.AbstractProblem", ) -> "plans.plan.Plan": """ This function takes a `PlanKind` and returns the representation of `self` in the given `plan_kind`. If the conversion does not make sense, raises an exception. :param plan_kind: The plan_kind of the returned plan. :param problem: The `Problem` of which this plan is referring to. :return: The plan equivalent to self but represented in the kind of `plan_kind`. """ if plan_kind == self._kind: return self elif plan_kind == plans.plan.PlanKind.PARTIAL_ORDER_PLAN: assert isinstance(problem, up.model.mixins.ObjectsSetMixin) return self._to_partial_order_plan(problem) else: raise UPUsageError(f"{type(self)} can't be converted to {plan_kind}.")