# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from itertools import chain, product
from numbers import Real
import unified_planning as up
import unified_planning.plans as plans
from unified_planning.environment import Environment
from unified_planning.exceptions import UPUsageError
from unified_planning.model import DeltaSimpleTemporalNetwork, TimepointKind
from unified_planning.plans.plan import ActionInstance
from fractions import Fraction
from dataclasses import dataclass
from typing import (
Callable,
Dict,
Iterator,
List,
Optional,
Set,
Tuple,
Union,
cast,
)
@dataclass(unsafe_hash=True, frozen=True)
class STNPlanNode:
"""
This class represents a node of the `STNPlan`.
:param kind: The `TimepointKind` of this node, it can be `global`, referring
to the `START` or the `END` of the `Plan` itself, or `not global`,
representing the `START` or the `END` of the given `ActionInstance`.
:param action_instance: Optionally, the `ActionInstance` that this node
represents. If the `kind` is `global`, this field must be `None`.
"""
kind: TimepointKind
action_instance: Optional[ActionInstance] = None
def __post_init___(self):
if (
self.kind in (TimepointKind.GLOBAL_START, TimepointKind.GLOBAL_END)
and self.action_instance is not None
):
raise UPUsageError(
f"A global kind represents Start/End of the plan;",
"the ActionInstance is not accepted.",
)
if (
self.kind in (TimepointKind.START, TimepointKind.END)
and self.action_instance is None
):
raise UPUsageError(
f"kind represents Start/End of an ActionInstance",
"but the ActionInstance is not given.",
)
def __repr__(self) -> str:
mappings: Dict[TimepointKind, str] = {
TimepointKind.GLOBAL_START: "START PLAN",
TimepointKind.GLOBAL_END: "END PLAN",
TimepointKind.START: "START ACTION",
TimepointKind.END: "END ACTION",
}
res: List[str] = []
res.append(mappings[self.kind])
if self.action_instance is not None:
res.append(str(self.action_instance))
return " ".join(res)
@property
def environment(self) -> Optional[Environment]:
if self.action_instance is not None:
return self.action_instance.action.environment
return None
def flatten_dict_structure(
d: Dict[STNPlanNode, List[Tuple[Optional[Real], Optional[Real], STNPlanNode]]]
) -> Iterator[Tuple[STNPlanNode, Optional[Real], Optional[Real], STNPlanNode]]:
"""
This method takes a dict containing a List of tuples of 3 elements, and
returns an Iterator of Tuples of 4 elements, where the first one is the key
and the other 3 are the elements in the list.
:param d: The dictionary to flatten.
"""
for k, v in d.items():
if not v:
yield (k, None, None, k)
for tup in v:
assert len(tup) == 3, str(tup)
yield (k, *tup)
[docs]
class STNPlan(plans.plan.Plan):
"""
Represents a `STNPlan`. A Simple Temporal Network plan is a generalization of
a `TimeTriggeredPlan`, where the only constraints are among the start and the
end of the different `ActionInstances` or among the `start` and the `end` of the
plan.
An `STNPlan` is consistent if exists a time assignment for each `STNPlanNode`
that does not violate any constraint; otherwise the `STNPlan` is inconsistent.
"""
def __init__(
self,
constraints: Union[
List[Tuple[STNPlanNode, Optional[Real], Optional[Real], STNPlanNode]],
Dict[STNPlanNode, List[Tuple[Optional[Real], Optional[Real], STNPlanNode]]],
],
environment: Optional["Environment"] = None,
_stn: Optional[DeltaSimpleTemporalNetwork[Fraction]] = None,
):
"""
Constructs the `STNPlan` with 2 different possible representations:
one as a `List` of `Tuples`, where each `Tuple` contains: `STNPlanNode A`,
the lower bound `L`, the upper bound `U` and the other `STNPlanNode B`
the other one as a `Dict` from `STNPlanNode A` to the `List` of `Tuples`,
where each `Tuple` contains: the lower bound `L`, the upper bound `U`
and the other `STNPlanNode B`.
The semantic is the same for the 2 representations and the temporal
constraints are represented like `L <= Time(A) - Time(B) <= U`, where
`Time[STNPlanNode]` is the time in which the STNPlanNode happen.
:param constraints: The data structure to create the `STNPlan`, explained
in details above.
:param environment: The environment in which the `ActionInstances` in the
constraints are created; this parameters is ignored if there is
another environment in the action instances given in the constraints.
:param _stn: Internal parameter, not to be used!
:return: The created `STNPlan`.
"""
assert (
_stn is None or not constraints
), "_stn and constraints can't be both given"
# if we have a specific env or we don't have any actions
env = environment
if (env is not None or not constraints) and _stn is None:
plans.plan.Plan.__init__(self, plans.plan.PlanKind.STN_PLAN, env)
# If we don't have a specific env, use the env of the first action
elif _stn is not None:
for r_node, cl in _stn.get_constraints().items():
assert isinstance(r_node, STNPlanNode), "Given _stn is wrong"
if r_node.environment is not None:
env = r_node.environment
else:
for _, l_node in cl:
assert isinstance(l_node, STNPlanNode), "Given _stn is wrong"
if l_node.environment is not None:
env = l_node.environment
break
if env is not None:
break
plans.plan.Plan.__init__(self, plans.plan.PlanKind.STN_PLAN, env)
self._stn: DeltaSimpleTemporalNetwork[Fraction] = _stn
return
elif isinstance(constraints, Dict):
for k_node, l in constraints.items():
if k_node.environment is not None:
env = k_node.environment
else:
for _, _, v_node in l:
if v_node.environment is not None:
env = v_node.environment
break
if env is not None:
break
plans.plan.Plan.__init__(self, plans.plan.PlanKind.STN_PLAN, env)
else:
assert isinstance(constraints, List), "Typing not respected"
for a_node, _, _, b_node in constraints:
if a_node.environment is not None:
env = a_node.environment
break
elif b_node.environment is not None:
env = b_node.environment
break
plans.plan.Plan.__init__(self, plans.plan.PlanKind.STN_PLAN, env)
# Create and populate the DeltaSTN
self._stn = DeltaSimpleTemporalNetwork()
if isinstance(constraints, List):
gen: Iterator[
Tuple[STNPlanNode, Optional[Real], Optional[Real], STNPlanNode]
] = iter(constraints)
else:
assert isinstance(constraints, Dict), "Typing not respected"
gen = flatten_dict_structure(constraints)
start_plan = STNPlanNode(TimepointKind.GLOBAL_START)
end_plan = STNPlanNode(TimepointKind.GLOBAL_END)
assert start_plan is not None and end_plan is not None
f0 = Fraction(0)
self._stn.insert_interval(start_plan, end_plan, left_bound=f0)
for a_node, lower_bound, upper_bound, b_node in gen:
if (
a_node.environment is not None
and a_node.environment != self._environment
) or (
b_node.environment is not None
and b_node.environment != self._environment
):
raise UPUsageError(
"Different environments given inside the same STNPlan!"
)
if a_node != start_plan:
self._stn.insert_interval(start_plan, a_node, left_bound=f0)
if a_node != end_plan:
self._stn.insert_interval(a_node, end_plan, left_bound=f0)
if b_node != start_plan:
self._stn.insert_interval(start_plan, b_node, left_bound=f0)
if b_node != end_plan:
self._stn.insert_interval(b_node, end_plan, left_bound=f0)
lb = None
if isinstance(lower_bound, Fraction):
lb = lower_bound
elif lower_bound is not None:
lb = Fraction(float(lower_bound))
ub = None
if isinstance(upper_bound, Fraction):
ub = upper_bound
elif upper_bound is not None:
ub = Fraction(float(upper_bound))
self._stn.insert_interval(a_node, b_node, left_bound=lb, right_bound=ub)
def __repr__(self) -> str:
return str(self._stn)
def __str__(self) -> str:
# give an ID, starting from 0, to every STNPlanNode in the Plan
swap_couple = lambda x: (x[1], x[0])
id: Dict[STNPlanNode, int] = dict(
map(swap_couple, enumerate(self._stn.distances.keys()))
)
convert_action_id = lambda action_id: f" {action_id[1]}) {action_id[0]}"
ret = ["STNPlan:", " Actions:"]
ret.extend(map(convert_action_id, id.items()))
ret.append(" Constraints:")
def convert_constraint(constraint):
left_element, lower_bound, upper_bound, right_element = constraint
if lower_bound is None:
str_lower_bound = "-inf"
elif lower_bound.denominator == 1:
str_lower_bound = str(lower_bound.numerator)
else:
str_lower_bound = str(float(lower_bound))
if upper_bound is None:
str_upper_bound = "+inf"
elif upper_bound.denominator == 1:
str_upper_bound = str(upper_bound.numerator)
else:
str_upper_bound = str(float(upper_bound))
return f" {id[left_element]} --[{str_lower_bound}, {str_upper_bound}]--> {id[right_element]}"
constraints = cast(
Dict[STNPlanNode, List[Tuple[Optional[Real], Optional[Real], STNPlanNode]]],
self.get_constraints(),
)
ret.extend(map(convert_constraint, flatten_dict_structure(constraints)))
return "\n".join(ret)
def __eq__(self, oth: object) -> bool:
if isinstance(oth, STNPlan):
self_contraints = self.get_constraints()
oth_constraints = oth.get_constraints()
if len(self_contraints) != len(oth_constraints):
return False
for k, self_cl in self_contraints.items():
oth_cl = oth_constraints.get(k, None)
if oth_cl is None or len(oth_cl) != len(self_cl):
return False
for self_c in self_cl:
if not self_c in oth_cl:
return False
return False
else:
return False
def __hash__(self) -> int:
count = 0
for k, cl in self.get_constraints().items():
for lb, ub, v in cl:
count += hash((k, lb, ub, v))
return count
def __contains__(self, item: object) -> bool:
if isinstance(item, ActionInstance):
return any(
n.action_instance is not None
and item.is_semantically_equivalent(n.action_instance)
for n in self._stn.distances
)
else:
return False
[docs]
def get_constraints(
self,
) -> Dict[
STNPlanNode, List[Tuple[Optional[Fraction], Optional[Fraction], STNPlanNode]]
]:
"""
Returns all the constraints given by this `STNPlan`. Subsumed constraints
are removed, this means that the constraints returned by this method are
only the stricter.
The mapping returned is from the node `A` to the `List` of `Tuple`
containing `lower_bound L`, `upper_bound U` and the node `B`.
The semantic is `L <= Time(A) - Time(B) <= U`, where `Time[STNPlanNode]`
is the time in which the `STNPlanNode` happen. `L` or `U` can be `None`,
this means that the lower/upper bound is not set.
"""
upper_bounds: Dict[Tuple[STNPlanNode, STNPlanNode], Fraction] = {}
lower_bounds: Dict[Tuple[STNPlanNode, STNPlanNode], Fraction] = {}
for b_node, l in self._stn.get_constraints().items():
for upper_bound, a_node in l:
if upper_bound > 0:
# Sets the upper bound for b-a; b-a is represented as the
# Tuple[smaller_node, bigger_node], so it is (a, b).
key = (a_node, b_node)
upper_bounds[key] = min(
upper_bound, upper_bounds.get(key, upper_bound)
)
else:
# If the bound is negative, it is represented as a lower bound for a-b
# instead of an upper bound for b-a (b-a <= x) -> (a-b >= -x)
key = (b_node, a_node)
lower_bounds[key] = max(
-upper_bound, lower_bounds.get(key, -upper_bound)
)
constraints: Dict[
STNPlanNode,
List[Tuple[Optional[Fraction], Optional[Fraction], STNPlanNode]],
] = {}
seen_couples: Set[Tuple[STNPlanNode, STNPlanNode]] = set()
for (left_node, right_node), upper_bound in upper_bounds.items():
key = (left_node, right_node)
seen_couples.add(key)
lower_bound = lower_bounds.get(key, None)
cl = constraints.setdefault(left_node, [])
cl.append((lower_bound, upper_bound, right_node))
for (left_node, right_node), lower_bound in lower_bounds.items():
key = (left_node, right_node)
if key not in seen_couples:
seen_couples.add(key)
cl = constraints.setdefault(left_node, [])
cl.append((lower_bound, None, right_node))
return constraints
[docs]
def replace_action_instances(
self,
replace_function: Callable[
["plans.plan.ActionInstance"], Optional["plans.plan.ActionInstance"]
],
) -> "plans.plan.Plan":
"""
Returns a new `STNPlan` where every `ActionInstance` of the current plan is replaced using the given `replace_function`.
:param replace_function: The function that applied to an `ActionInstance A` returns the `ActionInstance B`; `B`
replaces `A` in the resulting `Plan`.
:return: The `STNPlan` where every `ActionInstance` is replaced using the given `replace_function`.
"""
replaced_action_instances: Dict[ActionInstance, Optional[ActionInstance]] = {}
replaced_nodes: Dict[STNPlanNode, STNPlanNode] = {}
nodes_to_remove: Set[STNPlanNode] = set()
for node in self._stn.distances:
assert isinstance(node, STNPlanNode)
ai = node.action_instance
if ai is None:
replaced_nodes[node] = node
continue
replaced_ai = replaced_action_instances.setdefault(ai, replace_function(ai))
if replaced_ai is None:
nodes_to_remove.add(node)
replaced_nodes[node] = node
else:
replaced_nodes[node] = STNPlanNode(node.kind, replaced_ai)
stn_constraints = self._stn.get_constraints()
# right_nodes are the nodes to the right of the nodes that must be remove.
right_nodes: Dict[STNPlanNode, Set[Tuple[STNPlanNode, Fraction]]] = {}
left_nodes: Dict[STNPlanNode, Set[Tuple[STNPlanNode, Fraction]]] = {}
new_constraints: Dict[STNPlanNode, List[Tuple[Fraction, STNPlanNode]]] = {}
for r_node, constraints in stn_constraints.items():
replaced_r_node = replaced_nodes[r_node]
new_rrn_constraints: List[
Tuple[Fraction, STNPlanNode]
] = [] # rrn means replaced_right_node
# the nodes added on left_nodes_set are on the left of the current node
left_nodes_set: Optional[Set[Tuple[STNPlanNode, Fraction]]] = (
left_nodes.setdefault(replaced_r_node, set())
if replaced_r_node in nodes_to_remove
else None
)
for bound, l_node in constraints:
replaced_l_node = replaced_nodes[l_node]
if left_nodes_set is not None:
left_nodes_set.add((replaced_l_node, bound))
if replaced_l_node in nodes_to_remove:
right_nodes.setdefault(replaced_l_node, set()).add(
(replaced_r_node, bound)
)
new_rrn_constraints.append((bound, replaced_l_node))
new_constraints[replaced_r_node] = new_rrn_constraints
for ntr in nodes_to_remove:
left_nodes_set = left_nodes.get(ntr, set())
assert left_nodes_set is not None
right_nodes_set = right_nodes.get(ntr, set())
for (l_node, l_dist), (r_node, r_dist) in product(
left_nodes_set, right_nodes_set
):
r_node_constraints = new_constraints.setdefault(r_node, [])
sum_dist = l_dist + r_dist
r_node_constraints.append((sum_dist, l_node))
if l_node in nodes_to_remove:
right_nodes.setdefault(l_node, set()).add((r_node, sum_dist))
if r_node in nodes_to_remove:
left_nodes.setdefault(r_node, set()).add((l_node, sum_dist))
new_stn: DeltaSimpleTemporalNetwork = DeltaSimpleTemporalNetwork()
for r_node, constraints in new_constraints.items():
if not r_node in nodes_to_remove:
for bound, l_node in constraints:
if not l_node in nodes_to_remove:
new_stn.add(r_node, l_node, bound)
return STNPlan(constraints={}, environment=self._environment, _stn=new_stn)
[docs]
def convert_to(
self,
plan_kind: "plans.plan.PlanKind",
problem: "up.model.AbstractProblem",
) -> "plans.plan.Plan":
"""
This function takes a `PlanKind` and returns the representation of `self`
in the given `plan_kind`. If the conversion does not make sense, raises
an exception.
:param plan_kind: The plan_kind of the returned plan.
:param problem: The `Problem` of which this plan is referring to.
:return: The plan equivalent to self but represented in the kind of
`plan_kind`.
"""
if plan_kind == self._kind:
return self
elif plan_kind == plans.plan.PlanKind.TIME_TRIGGERED_PLAN:
return self._convert_to_time_triggered(problem)
else:
raise UPUsageError(f"{type(self)} can't be converted to {plan_kind}.")
[docs]
def is_consistent(self) -> bool:
"""
Returns True if if exists a time assignment for each STNPlanNode that
does not violate any constraint; False otherwise.
"""
return self._stn.check_stn()
def _convert_to_time_triggered(
self, problem: "up.model.AbstractProblem"
) -> "plans.plan.Plan":
# Mapping from an actionInstance to it's starting minimum time and ending maximum time
action_instance_map: Dict[
ActionInstance, Tuple[Optional[Fraction], Optional[Fraction]]
] = {}
for node, time in map(lambda x: (x[0], -x[1]), self._stn.distances.items()):
assert time >= 0
if time == 0:
time = Fraction(0)
if node.kind in (TimepointKind.GLOBAL_START, TimepointKind.GLOBAL_END):
continue
ai = node.action_instance
assert ai is not None
start, end = action_instance_map.get(ai, (None, None))
if node.kind == TimepointKind.START:
assert start is None
start = time
else:
assert node.kind == TimepointKind.END
assert end is None
end = time
action_instance_map[ai] = (start, end)
ttp_actions: List[Tuple[Fraction, ActionInstance, Optional[Fraction]]] = []
for ai, (start, end) in sorted(action_instance_map.items(), key=lambda x: x[1][0]): # type: ignore [arg-type, return-value]
assert start is not None
duration = None if end is None else end - start
ttp_actions.append((start, ai, duration))
return plans.time_triggered_plan.TimeTriggeredPlan(ttp_actions)