# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module defines the `Action` class and some of his extensions.
"""
import unified_planning as up
from unified_planning.environment import get_environment, Environment
from unified_planning.exceptions import (
UPTypeError,
UPUnboundedVariablesError,
UPProblemDefinitionError,
UPUsageError,
)
from unified_planning.model.mixins.timed_conds_effs import TimedCondsEffs
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Set, Union, Optional, Iterable
from collections import OrderedDict
from unified_planning.model.timing import EndTiming, StartTiming
from unified_planning.model.transition import (
UntimedEffectMixin,
PreconditionMixin,
Transition,
)
class Action(Transition):
"""This is the `Action` interface."""
def __call__(
self,
*args: "up.model.Expression",
agent: Optional["up.model.multi_agent.Agent"] = None,
motion_paths: Optional[
Dict["up.model.tamp.MotionConstraint", "up.model.tamp.Path"]
] = None,
) -> "up.plans.plan.ActionInstance":
params = tuple(args)
return up.plans.plan.ActionInstance(
self, params, agent=agent, motion_paths=motion_paths
)
[docs]
class InstantaneousAction(UntimedEffectMixin, Action, PreconditionMixin):
"""Represents an instantaneous action."""
def __init__(
self,
_name: str,
_parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None,
_env: Optional[Environment] = None,
**kwargs: "up.model.types.Type",
):
Action.__init__(self, _name, _parameters, _env, **kwargs)
PreconditionMixin.__init__(self, _env)
UntimedEffectMixin.__init__(self, _env)
def __repr__(self) -> str:
s = []
s.append(f"action {self.name}")
first = True
for p in self.parameters:
if first:
s.append("(")
first = False
else:
s.append(", ")
s.append(str(p))
if not first:
s.append(")")
s.append(" {\n")
s.append(" preconditions = [\n")
for c in self.preconditions:
s.append(f" {str(c)}\n")
s.append(" ]\n")
s.append(" effects = [\n")
for e in self.effects:
s.append(f" {str(e)}\n")
s.append(" ]\n")
if self._simulated_effect is not None:
s.append(f" simulated effect = {self._simulated_effect}\n")
s.append(" }")
return "".join(s)
def __eq__(self, oth: object) -> bool:
if isinstance(oth, InstantaneousAction):
cond = (
self._environment == oth._environment
and self._name == oth._name
and self._parameters == oth._parameters
)
return (
cond
and set(self._preconditions) == set(oth._preconditions)
and set(self._effects) == set(oth._effects)
and self._simulated_effect == oth._simulated_effect
)
else:
return False
def __hash__(self) -> int:
res = hash(self._name)
for ap in self._parameters.items():
res += hash(ap)
for p in self._preconditions:
res += hash(p)
for e in self._effects:
res += hash(e)
res += hash(self._simulated_effect)
return res
[docs]
def clone(self):
new_params = OrderedDict(
(param_name, param.type) for param_name, param in self._parameters.items()
)
new_instantaneous_action = InstantaneousAction(
self._name, new_params, self._environment
)
new_instantaneous_action._preconditions = self._preconditions[:]
new_instantaneous_action._effects = [e.clone() for e in self._effects]
new_instantaneous_action._fluents_assigned = self._fluents_assigned.copy()
new_instantaneous_action._fluents_inc_dec = self._fluents_inc_dec.copy()
new_instantaneous_action._simulated_effect = self._simulated_effect
return new_instantaneous_action
[docs]
class DurativeAction(Action, TimedCondsEffs):
"""Represents a durative action."""
def __init__(
self,
_name: str,
_parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None,
_env: Optional[Environment] = None,
**kwargs: "up.model.types.Type",
):
Action.__init__(self, _name, _parameters, _env, **kwargs)
TimedCondsEffs.__init__(self, _env)
self._duration: "up.model.timing.DurationInterval" = (
up.model.timing.FixedDuration(self._environment.expression_manager.Int(0))
)
self._continuous_effects: Dict[
"up.model.timing.TimeInterval", List["up.model.effect.Effect"]
] = {}
def __repr__(self) -> str:
s = []
s.append(f"durative action {self.name}")
first = True
for p in self.parameters:
if first:
s.append("(")
first = False
else:
s.append(", ")
s.append(str(p))
if not first:
s.append(")")
s.append(" {\n")
s.append(f" duration = {str(self._duration)}\n")
s.append(" conditions = [\n")
for i, cl in self.conditions.items():
s.append(f" {str(i)}:\n")
for c in cl:
s.append(f" {str(c)}\n")
s.append(" ]\n")
s.append(" effects = [\n")
for t, el in self.effects.items():
s.append(f" {str(t)}:\n")
for e in el:
s.append(f" {str(e)}:\n")
for t, el in self.continuous_effects.items():
s.append(f" {str(t)}:\n")
for e in el:
s.append(f" {str(e)}:\n")
s.append(" ]\n")
s.append(" simulated effects = [\n")
for t, se in self.simulated_effects.items():
s.append(f" {str(t)}: {se}\n")
s.append(" ]\n")
s.append(" }")
return "".join(s)
def __eq__(self, oth: object) -> bool:
if not isinstance(oth, DurativeAction):
return False
if (
self._environment != oth._environment
or self._name != oth._name
or self._parameters != oth._parameters
or self._duration != oth._duration
):
return False
if not TimedCondsEffs.__eq__(self, oth):
return False
if len(self._continuous_effects) != len(oth._continuous_effects):
return False
for t, el in self._continuous_effects.items():
oth_el = oth._continuous_effects.get(t, None)
if oth_el is None:
return False
elif set(el) != set(oth_el):
return False
return True
def __hash__(self) -> int:
res = hash(self._name) + hash(self._duration)
for ap in self._parameters.items():
res += hash(ap)
res += TimedCondsEffs.__hash__(self)
return res
@property
def continuous_effects(self):
return self._continuous_effects
[docs]
def clone(self):
new_params = OrderedDict(
(param_name, param.type) for param_name, param in self._parameters.items()
)
new_durative_action = DurativeAction(self._name, new_params, self._environment)
new_durative_action._duration = self._duration
TimedCondsEffs._clone_to(self, new_durative_action)
new_durative_action._continuous_effects = {
t: [e.clone() for e in el] for t, el in self._continuous_effects.items()
}
return new_durative_action
@property
def duration(self) -> "up.model.timing.DurationInterval":
"""Returns the `action` `duration interval`."""
return self._duration
[docs]
def set_duration_constraint(self, duration: "up.model.timing.DurationInterval"):
"""
Sets the `duration interval` for this `action`.
:param duration: The new `duration interval` of this `action`.
"""
lower, upper = duration.lower, duration.upper
tlower = self._environment.type_checker.get_type(lower)
tupper = self._environment.type_checker.get_type(upper)
assert tlower.is_int_type() or tlower.is_real_type()
assert tupper.is_int_type() or tupper.is_real_type()
if (
lower.is_constant()
and upper.is_constant()
and (
upper.constant_value() < lower.constant_value()
or (
upper.constant_value() == lower.constant_value()
and (duration.is_left_open() or duration.is_right_open())
)
)
):
raise UPProblemDefinitionError(
f"{duration} is an empty interval duration of action: {self.name}."
)
self._duration = duration
[docs]
def set_fixed_duration(self, value: "up.model.expression.NumericExpression"):
"""
Sets the `duration interval` for this `action` as the interval `[value, value]`.
:param value: The `value` set as both edges of this `action's duration`.
"""
(value_exp,) = self._environment.expression_manager.auto_promote(value)
self.set_duration_constraint(up.model.timing.FixedDuration(value_exp))
[docs]
def set_closed_duration_interval(
self,
lower: "up.model.expression.NumericExpression",
upper: "up.model.expression.NumericExpression",
):
"""
Sets the `duration interval` for this `action` as the interval `[lower, upper]`.
:param lower: The value set as the lower edge of this `action's duration`.
:param upper: The value set as the upper edge of this `action's duration`.
"""
lower_exp, upper_exp = self._environment.expression_manager.auto_promote(
lower, upper
)
self.set_duration_constraint(
up.model.timing.ClosedDurationInterval(lower_exp, upper_exp)
)
[docs]
def set_open_duration_interval(
self,
lower: "up.model.expression.NumericExpression",
upper: "up.model.expression.NumericExpression",
):
"""
Sets the `duration interval` for this action as the interval `]lower, upper[`.
:param lower: The value set as the lower edge of this `action's duration`.
:param upper: The value set as the upper edge of this `action's duration`.
Note that `lower` and `upper` are not part of the interval.
"""
lower_exp, upper_exp = self._environment.expression_manager.auto_promote(
lower, upper
)
self.set_duration_constraint(
up.model.timing.OpenDurationInterval(lower_exp, upper_exp)
)
[docs]
def set_left_open_duration_interval(
self,
lower: "up.model.expression.NumericExpression",
upper: "up.model.expression.NumericExpression",
):
"""
Sets the `duration interval` for this `action` as the interval `]lower, upper]`.
:param lower: The value set as the lower edge of this `action's duration`.
:param upper: The value set as the upper edge of this `action's duration`.
Note that `lower` is not part of the interval.
"""
lower_exp, upper_exp = self._environment.expression_manager.auto_promote(
lower, upper
)
self.set_duration_constraint(
up.model.timing.LeftOpenDurationInterval(lower_exp, upper_exp)
)
[docs]
def set_right_open_duration_interval(
self,
lower: "up.model.expression.NumericExpression",
upper: "up.model.expression.NumericExpression",
):
"""
Sets the `duration interval` for this `action` as the interval `[lower, upper[`.
:param lower: The value set as the lower edge of this `action's duration`.
:param upper: The value set as the upper edge of this `action's duration`.
Note that `upper` is not part of the interval.
"""
lower_exp, upper_exp = self._environment.expression_manager.auto_promote(
lower, upper
)
self.set_duration_constraint(
up.model.timing.RightOpenDurationInterval(lower_exp, upper_exp)
)
[docs]
def is_conditional(self) -> bool:
"""Returns `True` if the `action` has `conditional effects`, `False` otherwise."""
# re-implemenation needed for inheritance, delegate implementation.
return TimedCondsEffs.is_conditional(self)
[docs]
def clear_continuous_effects(self):
self._continuous_effects = {}
[docs]
def has_continuous_effects(self):
return len(self._continuous_effects) > 0
[docs]
def add_increase_continuous_effect(
self,
interval: "up.model.timing.TimeInterval",
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
rhs: "up.model.expression.Expression",
):
"""
During the given interval, adds the given `increment` to the `continuous_action's effects`.
:param interval: The interval in which the `increment` is applied.
:param fluent: The `fluent` which value is incremented by the added effect.
:param rhs: The given `fluent` is incremented according to the differential equation `d(fluent)/dt = rhs`.
"""
(
fluent_exp,
rhs_exp,
condition_exp,
) = self._environment.expression_manager.auto_promote(fluent, rhs, True)
if not fluent_exp.is_fluent_exp():
raise UPUsageError(
"fluent field of add_increase_continuous_effect must be a Fluent or a FluentExp"
)
if not fluent_exp.type.is_compatible(rhs_exp.type):
raise UPTypeError(
f"DurativeAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {rhs_exp.type}"
)
if not fluent_exp.type.is_real_type():
raise UPTypeError(
"Increase continuous effects can be created only on real type!"
)
self._add_continuous_effect_instance(
interval,
up.model.effect.Effect(
fluent_exp,
rhs_exp,
condition_exp,
kind=up.model.effect.EffectKind.CONTINUOUS_INCREASE,
forall=tuple(),
),
)
[docs]
def add_decrease_continuous_effect(
self,
interval: "up.model.timing.TimeInterval",
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
rhs: "up.model.expression.Expression",
):
"""
During the given interval, adds the given `decrement` to the `continuous_action's effects`.
:param interval: The interval in which the `decrement` is applied.
:param fluent: The `fluent` which value is decremented by the added effect.
:param rhs: The given `fluent` is decremented according to the differential equation `d(fluent)/dt = - rhs`.
"""
(
fluent_exp,
rhs_exp,
condition_exp,
) = self._environment.expression_manager.auto_promote(fluent, rhs, True)
if not fluent_exp.is_fluent_exp():
raise UPUsageError(
"fluent field of add_decrease_continuous_effect must be a Fluent or a FluentExp"
)
if not fluent_exp.type.is_compatible(rhs_exp.type):
raise UPTypeError(
f"DurativeAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {rhs_exp.type}"
)
if not fluent_exp.type.is_real_type():
raise UPTypeError(
"Decrease continuous effects can be created only on real type!"
)
self._add_continuous_effect_instance(
interval,
up.model.effect.Effect(
fluent_exp,
rhs_exp,
condition_exp,
kind=up.model.effect.EffectKind.CONTINUOUS_DECREASE,
forall=tuple(),
),
)
def _add_continuous_effect_instance(
self,
interval: "up.model.timing.TimeInterval",
continuous_effect: "up.model.effect.Effect",
):
assert (
self._environment == continuous_effect.environment
), "effect does not have the same environment of the action"
assert (
not continuous_effect.is_forall()
), "Continuous effects with forall variables are not supported yet"
self._continuous_effects.setdefault(interval, []).append(continuous_effect)