# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from functools import partial
from itertools import chain
import unified_planning as up
import unified_planning.plans as plans
from unified_planning.model import (
InstantaneousAction,
Timing,
DurativeAction,
Problem,
FNode,
TimepointKind,
Effect,
TimeInterval,
Timepoint,
SimulatedEffect,
)
from unified_planning.environment import Environment
from unified_planning.exceptions import UPUsageError
from typing import (
Callable,
Dict,
Iterator,
Optional,
OrderedDict,
Set,
Tuple,
List,
Union,
)
from fractions import Fraction
[docs]
class TimeTriggeredPlan(plans.plan.Plan):
"""Represents a time triggered plan."""
def __init__(
self,
actions: List[Tuple[Fraction, "plans.plan.ActionInstance", Optional[Fraction]]],
environment: Optional["Environment"] = None,
):
"""
The first `Fraction` represents the absolute time in which the
`Action` starts, while the last `Fraction` represents the duration
of the `Action` to fulfill the `problem goals`.
The `Action` can be an `InstantaneousAction`, this is represented with a duration set
to `None`.
"""
# if we have a specific environment or we don't have any actions
if environment is not None or not actions:
plans.plan.Plan.__init__(
self, plans.plan.PlanKind.TIME_TRIGGERED_PLAN, environment
)
# If we don't have a specific environment and have at least 1 action, use the environment of the first action
else:
assert len(actions) > 0
plans.plan.Plan.__init__(
self,
plans.plan.PlanKind.TIME_TRIGGERED_PLAN,
actions[0][1].action.environment,
)
for (
_,
ai,
_,
) in (
actions
): # check that given environment and the environment in the actions is the same
if ai.action.environment != self._environment:
raise UPUsageError(
"The environment given to the plan is not the same of the actions in the plan."
)
self._actions = actions
def __repr__(self) -> str:
return f"TimeTriggeredPlan({self._actions})"
def __str__(self) -> str:
def convert_ai(start_ai_dur):
start, ai, dur = start_ai_dur
if dur is None:
return f" {float(start)}: {ai}"
else:
return f" {float(start)}: {ai} [{float(dur)}]"
ret = ["TimeTriggeredPlan:"]
ret.extend(map(convert_ai, sorted(self._actions, key=lambda x: x[0])))
return "\n".join(ret)
def __eq__(self, oth: object) -> bool:
if isinstance(oth, TimeTriggeredPlan) and len(self._actions) == len(
oth._actions
):
for (s, ai, d), (oth_s, oth_ai, oth_d) in zip(self._actions, oth._actions):
if (
s != oth_s
or ai.action != oth_ai.action
or ai.actual_parameters != oth_ai.actual_parameters
or d != oth_d
):
return False
return True
else:
return False
def __hash__(self) -> int:
count: int = 0
for i, (s, ai, d) in enumerate(self._actions):
count += (
i + hash(ai.action) + hash(ai.actual_parameters) + hash(s) + hash(d)
)
return count
def __contains__(self, item: object) -> bool:
if isinstance(item, plans.plan.ActionInstance):
return any(item.is_semantically_equivalent(a) for _, a, _ in self._actions)
else:
return False
@property
def timed_actions(
self,
) -> List[Tuple[Fraction, "plans.plan.ActionInstance", Optional[Fraction]]]:
"""
Returns the sequence of tuples (`start`, `action_instance`, `duration`) where:
- `start` is when the `ActionInstance` starts;
- `action_instance` is the `grounded Action` applied;
- `duration` is the (optional) duration of the `ActionInstance`.
"""
return self._actions
[docs]
def replace_action_instances(
self,
replace_function: Callable[
["plans.plan.ActionInstance"], Optional["plans.plan.ActionInstance"]
],
) -> "plans.plan.Plan":
"""
Returns a new `TimeTriggeredPlan` where every `ActionInstance` of the current `Plan` is replaced using the given `replace_function`.
:param replace_function: The function that applied to an `ActionInstance A` returns the `ActionInstance B`; `B`
replaces `A` in the resulting `Plan`.
:return: The `TimeTriggeredPlan` where every `ActionInstance` is replaced using the given `replace_function`.
"""
new_ai = []
for s, ai, d in self._actions:
replaced_ai = replace_function(ai)
if replaced_ai is not None:
new_ai.append((s, replaced_ai, d))
new_env = self._environment
if len(new_ai) > 0:
_, ai, _ = new_ai[0]
new_env = ai.action.environment
return TimeTriggeredPlan(new_ai, new_env)
[docs]
def convert_to(
self,
plan_kind: "plans.plan.PlanKind",
problem: "up.model.AbstractProblem",
) -> "plans.plan.Plan":
"""
This function takes a `PlanKind` and returns the representation of `self`
in the given `plan_kind`. If the conversion does not make sense, raises
an exception.
:param plan_kind: The plan_kind of the returned plan.
:param problem: The `Problem` of which this plan is referring to.
:return: The plan equivalent to self but represented in the kind of
`plan_kind`.
"""
if plan_kind == self._kind:
return self
elif plan_kind == plans.plan.PlanKind.STN_PLAN:
return _convert_to_stn(self, problem)
else:
raise UPUsageError(f"{type(self)} can't be converted to {plan_kind}.")
def _absolute_time(
relative_time: Timing, start: Fraction, duration: Fraction
) -> Fraction:
"""
Given the start time and the timing in the action returns the absolute
time of the given timing.
:param relative_time: The timing in the action.
:param start: the starting time of the action.
:param duration: The duration of the action.
:return: The absolute time of the given timing.
"""
if relative_time.is_from_start():
return start + relative_time.delay
else:
return start + duration + relative_time.delay
def _convert_to_stn(
time_triggered_plan: TimeTriggeredPlan,
problem: "up.model.AbstractProblem",
) -> "plans.stn_plan.STNPlan":
# This algorithm takes the TimeTriggeredPlan and converts it to an STNPlan, by
# removing the temporal dimension, creating a SequentialPlan, then de-ordering the
# SequentialPlan creating a PartialOrderPlan and re-adding the temporal dimension,
# getting an STNPlan in the end.
from unified_planning.plans.stn_plan import STNPlanNode, STNPlan
assert isinstance(problem, Problem), "This algorithm works only for Problem"
epsilon = problem.epsilon
if epsilon is None:
epsilon = time_triggered_plan.extract_epsilon(problem)
if epsilon is None:
epsilon = Fraction(1, 1000)
else:
epsilon = min(epsilon / 10, Fraction(1, 1000))
assert epsilon is not None
# Constraints that go in the final STNPlan
stn_constraints: Dict[
STNPlanNode, List[Tuple[Optional[Fraction], Optional[Fraction], STNPlanNode]]
] = {}
start_plan_node = STNPlanNode(TimepointKind.GLOBAL_START, None)
# Mapping from an ActionInstance to it's Starting STNPlanNodes
ai_to_start_node: Dict["plans.plan.ActionInstance", STNPlanNode] = {}
# create a mockup durative action that contains the problem's timed_effects and conditions
mockup_action = DurativeAction("mockup_action")
for interval, cl in problem.timed_goals.items():
start_timepoint = Timepoint(
TimepointKind.START if interval.lower.is_from_start() else TimepointKind.END
)
start_timing = Timing(interval.lower.delay, start_timepoint)
end_timepoint = Timepoint(
TimepointKind.START if interval.upper.is_from_start() else TimepointKind.END
)
end_timing = Timing(interval.upper.delay, end_timepoint)
relative_interval = TimeInterval(
start_timing, end_timing, interval.is_left_open(), interval.is_right_open()
)
for c in cl:
mockup_action.add_condition(relative_interval, c)
for global_timing, el in problem.timed_effects.items():
timepoint = Timepoint(
TimepointKind.START if global_timing.is_from_start() else TimepointKind.END
)
timing = Timing(global_timing.delay, timepoint)
for e in el:
mockup_action._add_effect_instance(timing, e)
mockup_action_instance = plans.ActionInstance(mockup_action)
events: Dict[Fraction, List["plans.plan.ActionInstance"]] = {}
# Mapping from an event to the action and the relative time that created it
event_creating_ais: Dict[
"plans.plan.ActionInstance", Tuple["plans.plan.ActionInstance", Fraction]
] = {}
for start, ai, duration in chain(
[(Fraction(0), mockup_action_instance, Fraction(-1))],
time_triggered_plan.timed_actions,
):
if ai == mockup_action_instance:
start_node = start_plan_node
else:
start_node = STNPlanNode(TimepointKind.START, ai)
end_node = None
action = ai.action
if duration is None:
assert isinstance(
action, InstantaneousAction
), "Error, None duration specified for non InstantaneousAction"
events.setdefault(start, []).append(ai)
event_creating_ais[ai] = (ai, Fraction(0))
stn_constraints.setdefault(start_plan_node, []).append(
(Fraction(0), None, start_node)
)
else:
assert isinstance(
action, DurativeAction
), "Error, Action is not a DurativeAction nor an InstantaneousAction"
for absolute_timing, inst_action in _extract_instantenous_actions(
action, start, duration, epsilon
):
if absolute_timing < 0:
continue
inst_ai = plans.ActionInstance(inst_action, ai.actual_parameters)
events.setdefault(absolute_timing, []).append(inst_ai)
relative_timing = absolute_timing - start
event_creating_ais[inst_ai] = (ai, relative_timing)
if ai != mockup_action_instance:
end_node = STNPlanNode(TimepointKind.END, ai)
assert start_node not in stn_constraints
stn_constraints[start_node] = [(duration, duration, end_node)]
ai_to_start_node[ai] = start_node
simultaneous_events: List[Set["plans.plan.ActionInstance"]] = [
set(l) for l in events.values() if len(l) > 1
]
sorted_events = sorted(events.items(), key=lambda acts: acts[0])
# Create the equivalent sequential plan and then deorder it to partial order plan
list_act = [ia for _, se in sorted_events for ia in se]
seq_plan = plans.SequentialPlan(list_act)
partial_order_plan = seq_plan.convert_to(plans.PlanKind.PARTIAL_ORDER_PLAN, problem)
assert isinstance(partial_order_plan, plans.PartialOrderPlan)
for ai_current, l_next_ai in partial_order_plan.get_adjacency_list.items():
# Get the ActionInstance that generated this event and add the constraint between the starting of the action and the start
# of the action that generated the other event.
# If the 2 events were simultaneous and have a constraint, they are forced to happen together, otherwise the event
# that is scheduled later in the original plan is forced to happen later also in the STN (later by an epsilon > 0)
current_generating_ai, current_skew_time = event_creating_ais[ai_current]
current_start_node = ai_to_start_node[current_generating_ai]
current_simultaneous_events = None
for se in simultaneous_events:
if ai_current in se:
current_simultaneous_events = se
break
for ai_next in l_next_ai:
next_generating_ai, next_skew_time = event_creating_ais[ai_next]
next_start_node = ai_to_start_node[next_generating_ai]
if current_generating_ai != next_generating_ai:
upper_bound = None
if (
current_simultaneous_events is not None
and ai_next in current_simultaneous_events
):
lower_bound = current_skew_time - next_skew_time
upper_bound = lower_bound
else:
lower_bound = current_skew_time - next_skew_time + epsilon
stn_constraints.setdefault(current_start_node, []).append(
(
lower_bound,
upper_bound,
next_start_node,
)
)
return STNPlan(constraints=stn_constraints) # type: ignore [arg-type]
def _get_timepoint_conditions(
action: DurativeAction,
timing: Fraction,
start: Fraction,
duration: Fraction,
) -> List[FNode]:
"""
Returns the List of conditions of the given action in the given timing.
start and duration are the start and duration of the given action
"""
timepoint_conditions: List[FNode] = []
for time_interval, cond_list in action.conditions.items():
if _is_time_in_interv(start, duration, timing, time_interval):
timepoint_conditions.extend(cond_list)
return timepoint_conditions
def _get_timepoint_effects(
action: DurativeAction,
timing: Fraction,
start: Fraction,
duration: Fraction,
) -> List[Effect]:
"""
Returns the List of effects of the given action in the given timing.
start and duration are the start and duration of the given action
"""
timepoint_effects = []
for effects_timing, el in action.effects.items():
absolute_effect_time = _absolute_time(effects_timing, start, duration)
if absolute_effect_time == timing:
timepoint_effects.extend(el)
return timepoint_effects
def _get_timepoint_simulated_effects(
action: DurativeAction,
timing: Fraction,
start: Fraction,
duration: Fraction,
) -> Optional[SimulatedEffect]:
"""
Returns the simulated effect of the given action in the given timing.
start and duration are the start and duration of the given action
"""
sim_eff: Optional[SimulatedEffect] = None
for se_timing, se in action.simulated_effects.items():
absolute_effect_time = _absolute_time(se_timing, start, duration)
if absolute_effect_time == timing:
if sim_eff is not None:
raise NotImplementedError(
f"The algorithm to convert from ttp to stn works if there are no conflicting effects. \
The action: {action.name} with duration: {duration} has 2 simulated effects at the same time"
)
sim_eff = se
return sim_eff
def _extract_action_timings(
action: DurativeAction,
start: Fraction,
duration: Fraction,
epsilon: Fraction = Fraction(0),
) -> Set[Fraction]:
"""
Extracts all the interesting timings of the action. So timings where:
- a condition start/ends
- an effect takes place
- a simulated effects takes place
"""
timings: Set[Fraction] = set()
absolute_time = lambda timing: _absolute_time(timing, start, duration)
timings.update(map(absolute_time, chain(action.effects, action.simulated_effects)))
for interval in action.conditions.keys():
lower_increment: Fraction = epsilon if interval.is_left_open() else Fraction(0)
upper_increment: Fraction = (
-epsilon if interval.is_right_open() else Fraction(0)
)
timings.add(_absolute_time(interval.lower, start, duration) + lower_increment)
timings.add(_absolute_time(interval.upper, start, duration) + upper_increment)
return timings
def _extract_instantenous_actions(
action: DurativeAction, start: Fraction, duration: Fraction, epsilon: Fraction
) -> Iterator[Tuple[Fraction, InstantaneousAction]]:
"""
Splits a DurativeAction into the InstantaneousActions that compose the DurativeAction.
For example, an action that has an effect at start and one at the end, will be split into 2
InstantaneousActions, the first one representing the start and the last one representing the
end. This method creates an Action for every timing returned by "_extract_action_timings"
"""
for i, timing in enumerate(
_extract_action_timings(action, start, duration, epsilon)
):
inst_action = InstantaneousAction(
f"{action.name}_{i}",
_parameters=OrderedDict(((p.name, p.type) for p in action.parameters)),
)
for cond in _get_timepoint_conditions(action, timing, start, duration):
inst_action.add_precondition(cond)
for eff in _get_timepoint_effects(action, timing, start, duration):
inst_action._add_effect_instance(eff)
sim_eff = _get_timepoint_simulated_effects(action, timing, start, duration)
if sim_eff is not None:
inst_action.set_simulated_effect(sim_eff)
yield timing, inst_action
def _is_time_in_interv(
start: Fraction,
duration: Fraction,
timing: Fraction,
interval: "up.model.timing.TimeInterval",
) -> bool:
"""
Return if the timepoint is in the interval given.
"""
upper_time = _absolute_time(interval.upper, start=start, duration=duration)
lower_time = _absolute_time(interval.lower, start=start, duration=duration)
timing_bigger_than_lower = (
timing > lower_time if interval.is_left_open() else timing >= lower_time
)
timing_smaller_than_upper = (
timing < upper_time if interval.is_right_open() else timing <= upper_time
)
return timing_bigger_than_lower and timing_smaller_than_upper