# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections import OrderedDict
import unified_planning as up
import networkx as nx
from unified_planning.io.anml_grammar import (
TK_ALL,
TK_AND,
TK_ASSIGN,
TKS_DECREASE,
TK_DIV,
TK_DURATION,
TK_END,
TK_EQUALS,
TK_FALSE,
TK_FORALL,
TK_EXISTS,
TK_GE,
TK_GT,
TK_IMPLIES,
TKS_INCREASE,
TK_L_BRACKET,
TK_L_PARENTHESIS,
TK_LE,
TK_LT,
TK_MINUS,
TK_NOT,
TK_NOT_EQUALS,
TK_OR,
TK_PLUS,
TK_R_BRACKET,
TK_R_PARENTHESIS,
TK_START,
TK_TIMES,
TK_TRUE,
TK_WHEN,
TK_XOR,
ANMLGrammar,
TK_BOOLEAN,
TK_INTEGER,
TK_INFINITY,
TK_FLOAT,
)
from unified_planning.environment import Environment, get_environment
from unified_planning.exceptions import ANMLSyntaxError, UPUnsupportedProblemTypeError
from unified_planning.model import (
DurationInterval,
Effect,
EffectKind,
FNode,
StartTiming,
GlobalStartTiming,
EndTiming,
GlobalEndTiming,
FixedDuration,
Timing,
TimeInterval,
Type,
Parameter,
Variable,
)
from fractions import Fraction
from typing import Dict, Sequence, Set, Tuple, Union, Callable, List, Optional
from pyparsing import ParseResults
from unified_planning.io.utils import parse_string, parse_file
[docs]
class ANMLReader:
"""
Class that offers the capability, with the :func:`parse_problem <unified_planning.io.ANMLReader.parse_problem>`, to create a :class:`~unified_planning.model.Problem` from an
**ANML** file.
The assumptions made in order for this Reader to work are the followings:
#. statements containing the duration of an action are **not** mixed with other statements ( ``duration == 3 and at(l_from);`` ).
#. the action duration can be set with:
* ``duration == expression;``
* ``duration := expression;``
* ``duration CT expression and duration CT expression``, where ``CT`` are Compare Tokens, so ``>``, ``>=``, ``<`` and ``<=``.
All the other ways to define the duration of an Action are not supported.
#. Statements containing both conditions and effects are **not** supported ( ``(at(l_from) == true) := false);`` or ``(at(l_from) == true) and (at(l_from) := false);`` ).
#. Quantifier body does not support intervals, they can only be defined outside.
#. Forall over the assignments are not nested.
#. Only one effect can be specified in a forall over assignments.
#. No conditions can be specified ina forall over assignments, 2 different foralls must be specified.
#. Conditional effects are not supported inside an expression block.
"""
def __init__(self, env: Optional[Environment] = None):
self._env = get_environment(env)
self._em = self._env.expression_manager
self._tm = self._env.type_manager
# map from timing name, globality flag to corresponding Timepoint
self._timings = {
(TK_START, True): GlobalStartTiming(),
(TK_START, False): StartTiming(),
(TK_END, True): GlobalEndTiming(),
(TK_END, False): EndTiming(),
}
self._operators: Dict[str, Callable] = {
TK_AND: self._em.And,
TK_OR: self._em.Or,
TK_NOT: self._em.Not,
TK_XOR: (
lambda x, y: self._em.And(
self._em.Or(x, y), self._em.Or(self._em.Not(x), self._em.Not(y))
)
),
TK_IMPLIES: self._em.Implies,
TK_GE: self._em.GE,
TK_LE: self._em.LE,
TK_GT: self._em.GT,
TK_LT: self._em.LT,
TK_EQUALS: self._em.Equals,
TK_NOT_EQUALS: (lambda x, y: self._em.Not(self._em.Equals(x, y))),
TK_PLUS: self._em.Plus,
TK_MINUS: self._em.Minus,
TK_DIV: self._em.Div,
TK_TIMES: self._em.Times,
TK_FORALL: self._em.Forall,
TK_EXISTS: self._em.Exists,
}
def _parse_problem(
self, grammar: ANMLGrammar, problem_name: Optional[str]
) -> "up.model.Problem":
self._problem = up.model.Problem(
problem_name,
self._env,
initial_defaults={self._tm.BoolType(): self._em.FALSE()},
)
types_map: Dict[str, "up.model.Type"] = self._create_types_map(grammar.types)
self._constant_fluents: Set["up.model.Fluent"] = set()
for fluent_res in grammar.constant_fluents:
(up_fluent, initial_default) = self._parse_fluent(fluent_res, types_map)
if initial_default is not None:
self._problem.add_fluent(
up_fluent, default_initial_value=initial_default
)
else:
self._problem.add_fluent(up_fluent)
self._constant_fluents.add(up_fluent)
for fluent_res in grammar.fluents:
(up_fluent, initial_default) = self._parse_fluent(fluent_res, types_map)
if initial_default is not None:
self._problem.add_fluent(
up_fluent, default_initial_value=initial_default
)
else:
self._problem.add_fluent(up_fluent)
for objects_res in grammar.objects:
up_objects = self._parse_objects(objects_res, types_map)
self._problem.add_objects(up_objects)
for action_res in grammar.actions:
up_action = self._parse_action(action_res, types_map)
self._problem.add_action(up_action)
# params is used in the expression parsing not to recreate an empty dict every time
params: Dict[str, "Parameter"] = {}
# global_start and global_end are used to check if the interval is at the beginning or at the end
global_start = GlobalStartTiming()
global_end = GlobalEndTiming()
for interval_and_expression in grammar.timed_assignment_or_goal:
self._add_goal_or_effect_to_problem(
interval_and_expression[0],
interval_and_expression[1],
params,
types_map,
global_start,
global_end,
)
for block_res in grammar.timed_assignments_or_goals:
interval_and_expressions_res = block_res[0]
assert (
len(block_res) == 1 and len(interval_and_expressions_res) == 2
), "parsing error"
for expression in interval_and_expressions_res[1]:
if expression[0] == TK_WHEN:
raise UPUnsupportedProblemTypeError(
"Conditional effects inside an expression block are not supported"
)
self._add_goal_or_effect_to_problem(
interval_and_expressions_res[0],
expression,
params,
types_map,
global_start,
global_end,
)
# check that every fluent defined as a constant is a static fluent
static_fluents = self._problem.get_static_fluents()
for constant_fluent in self._constant_fluents:
if constant_fluent not in static_fluents:
raise ANMLSyntaxError(
f"The constant {constant_fluent} is modified in the problem."
)
return self._problem
[docs]
def parse_problem(
self,
problem_filename: Union[str, Sequence[str]],
problem_name: Optional[str] = None,
) -> "up.model.Problem":
"""
Takes in input a filename containing an `ANML` problem and returns the parsed `Problem`.
Check the class documentation for the assumptions made for this parser to work.
:param problem_filename: The path to the file containing the `ANML` problem
or to the files to concatenate to obtain the complete problem.
:param problem_name: Optionally, the name to give to the created problem; if it is None,
`problem_filename` will be set as the problem name.
:return: The `Problem` parsed from the given anml file.
"""
# create the grammar and populate it's data structures
grammar = ANMLGrammar()
parse_file(grammar.problem, problem_filename, parse_all=True)
if problem_name is None:
if isinstance(problem_filename, str):
problem_name = problem_filename
else:
problem_name = "_".join(problem_filename)
self._problem = self._parse_problem(grammar, problem_name)
return self._problem
[docs]
def parse_problem_string(
self, problem_str: str, problem_name: Optional[str] = None
) -> "up.model.Problem":
"""
Takes in input a string representing an `ANML` problem and returns the parsed `Problem`.
Check the class documentation for the assumptions made for this parser to work.
:param problem_str: The string representing the `ANML` problem.
:param problem_name: Optionally, the name to give to the created problem.
:return: The `Problem` parsed from the given anml file.
"""
# create the grammar and populate it's data structures
grammar = ANMLGrammar()
parse_string(grammar.problem, problem_str, parse_all=True)
self._problem = self._parse_problem(grammar, problem_name)
return self._problem
def _create_types_map(self, types_res) -> Dict[str, "up.model.Type"]:
# defined types stores the types explicitly defined.
# Every type needs to be defined explicitly
defined_types: Set[str] = set()
# types_graph is used to compute a topological sorting of the type hierarchy,
# needed to make sure a type is created (in the UP code) before being used as
# a father of another type; this ordering is not granted by the ANML grammar
types_graph = nx.DiGraph()
for type_res in types_res:
type_name = type_res["name"]
defined_types.add(type_name)
previous_element = type_name
types_graph.add_node(type_name)
for supertype in type_res["supertypes"]:
assert isinstance(
supertype, str
), f"parsing error, type {type_res} is not valid"
types_graph.add_node(supertype)
types_graph.add_edge(supertype, previous_element)
previous_element = supertype
types_map: Dict[str, "up.model.Type"] = {}
for type_name in nx.topological_sort(types_graph):
if type_name not in defined_types:
raise ANMLSyntaxError(
f"The type {type_name} is used in the type hierarchy but is never defined"
)
fathers = list(types_graph.predecessors(type_name))
len_fathers = len(fathers)
if len_fathers == 0:
father = None
elif len_fathers == 1:
father = types_map[fathers[0]]
else:
raise NotImplementedError(
f"The type {type_name} has more than one father. Currently this is not supported in the UP."
)
types_map[type_name] = self._tm.UserType(type_name, father)
return types_map
def _add_goal_or_effect_to_problem(
self,
interval: ParseResults,
expression: ParseResults,
parameters: Dict[str, "Parameter"],
types_map: Dict[str, "Type"],
global_start: "Timing",
global_end: "Timing",
):
relevant_words = {TK_DURATION, TK_ASSIGN, *TKS_INCREASE, *TKS_DECREASE, TK_WHEN}
found_words = find_strings(expression, relevant_words)
f_duration = TK_DURATION in found_words
f_when = TK_WHEN in found_words
f_assign = TK_ASSIGN in found_words
f_increase = any(tk_increase in found_words for tk_increase in TKS_INCREASE)
f_decrease = any(tk_decrease in found_words for tk_decrease in TKS_DECREASE)
if f_duration:
raise ANMLSyntaxError("duration keyword can't be used outside of an action")
if f_when or f_assign or f_increase or f_decrease: # effect
interval_and_exp = ParseResults([interval, expression])
up_timing, up_effect = self._parse_assignment(
interval_and_exp,
parameters,
types_map,
is_global=True,
)
if (
up_timing == global_start
and not up_effect.is_conditional()
and up_effect.is_assignment()
):
if up_effect.is_forall():
for e in up_effect.expand_effect(self._problem):
self._problem.set_initial_value(e.fluent, e.value)
else:
self._problem.set_initial_value(up_effect.fluent, up_effect.value)
else:
self._problem._add_effect_instance(up_timing, up_effect)
else: # condition
up_interval = self._parse_interval(
interval, parameters, types_map, is_global=True
)
goal = self._parse_expression(expression, parameters, types_map)
if up_interval == global_end:
self._problem.add_goal(goal)
else:
self._problem.add_timed_goal(up_interval, goal)
def _parse_type_reference(
self, type_res: ParseResults, types_map: Dict[str, "up.model.Type"]
) -> "up.model.Type":
name = type_res[0]
assert isinstance(name, str), "parsing error"
if name == TK_BOOLEAN:
return self._tm.BoolType()
elif name in (TK_INTEGER, TK_FLOAT):
lower_bound, upper_bound = None, None
if len(type_res) == 2:
_p: Dict[str, "up.model.Parameter"] = {}
interval = type_res[1]
lb_exp = interval[0]
if lb_exp != TK_INFINITY:
lower_bound_exp = self._parse_expression(
lb_exp, parameters=_p, types_map=types_map
)
if (
lower_bound_exp.is_int_constant()
or lower_bound_exp.is_real_constant()
):
lower_bound = lower_bound_exp.constant_value()
else:
raise ANMLSyntaxError(
f"bounds of type {type_res} must be integer or real constants"
)
ub_exp = interval[1]
if ub_exp != TK_INFINITY:
upper_bound_exp = self._parse_expression(
ub_exp, parameters=_p, types_map=types_map
)
if (
upper_bound_exp.is_int_constant()
or upper_bound_exp.is_real_constant()
):
upper_bound = upper_bound_exp.constant_value()
else:
raise ANMLSyntaxError(
f"bounds of type {type_res} must be integer or real constants"
)
else:
assert len(type_res) == 1, "Parse error"
if name == TK_INTEGER:
if isinstance(lower_bound, Fraction) or isinstance(
upper_bound, Fraction
):
raise ANMLSyntaxError(
f"Integer bounds of {type_res} must be int expressions"
)
return self._tm.IntType(lower_bound, upper_bound)
else:
if isinstance(lower_bound, int):
lower_bound = Fraction(lower_bound)
if isinstance(upper_bound, int):
upper_bound = Fraction(upper_bound)
return self._tm.RealType(lower_bound, upper_bound)
else:
ret_type = types_map.get(name, None)
if ret_type is not None:
return ret_type
else:
raise ANMLSyntaxError(
f"UserType {name} is referenced but never defined."
)
def _parse_fluent(
self, fluent_res: ParseResults, types_map: Dict[str, "up.model.Type"]
) -> Tuple["up.model.Fluent", Optional["FNode"]]:
fluent_type = self._parse_type_reference(fluent_res["type"], types_map)
fluent_name = fluent_res["name"]
params: "OrderedDict[str, up.model.Type]" = self._parse_parameters_def(
fluent_res["parameters"], types_map
)
if "init" in fluent_res:
initial_default = self._parse_expression(fluent_res["init"], {}, types_map)
else:
initial_default = None
return (
up.model.Fluent(fluent_name, fluent_type, _signature=params),
initial_default,
)
def _parse_objects(
self, objects_res: ParseResults, types_map: Dict[str, "up.model.Type"]
) -> List["up.model.Object"]:
objects_type = self._parse_type_reference(objects_res["type"], types_map)
up_objects: List["up.model.Object"] = []
for name in objects_res["names"]:
assert isinstance(name, str), "parsing error"
up_objects.append(up.model.Object(name, objects_type))
return up_objects
def _parse_action(
self, action_res: ParseResults, types_map: Dict[str, "up.model.Type"]
) -> "up.model.Action":
name = action_res["name"]
assert isinstance(name, str), "parsing error"
params = self._parse_parameters_def(action_res["parameters"], types_map)
action = up.model.DurativeAction(name, _parameters=params)
action_parameters: Dict[str, "up.model.Parameter"] = {
n: action.parameter(n) for n in params
}
self._populate_parsed_action_body(
action, action_res["body"], action_parameters, types_map
)
return action
def _populate_parsed_action_body(
self,
action: "up.model.DurativeAction",
action_body_res: ParseResults,
action_parameters: Dict[str, "up.model.Parameter"],
types_map: Dict[str, "Type"],
) -> None:
for interval_and_exp in action_body_res:
relevant_words = {
TK_DURATION,
TK_ASSIGN,
*TKS_INCREASE,
*TKS_DECREASE,
TK_WHEN,
}
found_words = find_strings(interval_and_exp, relevant_words)
f_duration = TK_DURATION in found_words
f_when = TK_WHEN in found_words
f_assign = TK_ASSIGN in found_words
f_increase = any(tk_increase in found_words for tk_increase in TKS_INCREASE)
f_decrease = any(tk_decrease in found_words for tk_decrease in TKS_DECREASE)
if f_duration: # handle duration
if f_when:
raise ANMLSyntaxError(
"expressions containing the duration can't be conditional"
)
if f_increase:
raise ANMLSyntaxError(
"expressions containing the duration can't be increase"
)
if f_decrease:
raise ANMLSyntaxError(
"expressions containing the duration can't be decrease"
)
duration_exp = interval_and_exp[1][0]
if f_assign: # duration assignment
if (
duration_exp[0][0] != TK_DURATION
or duration_exp[1] != TK_ASSIGN
):
raise UPUnsupportedProblemTypeError(
"An expression contains the duration keyword and the assignment operand "
+ "but it's not in the supported form: 'duration := exp;'"
)
action.set_duration_constraint(
FixedDuration(
self._parse_expression(
duration_exp[2], action_parameters, types_map
)
)
)
else:
if duration_exp[1] == TK_EQUALS: # duration == exp
if duration_exp[0][0] != TK_DURATION:
raise UPUnsupportedProblemTypeError(
"An expression contains the duration keyword and the equals operand "
+ "but it's not in the supported form: 'duration == exp;'"
)
action.set_duration_constraint(
FixedDuration(
self._parse_expression(
duration_exp[2], action_parameters, types_map
)
)
)
else:
if duration_exp[1] != TK_AND:
raise UPUnsupportedProblemTypeError(
"An expression that contains the duration keyword and no equals or assignment operand "
+ "is supported only if the top level operand is an and. The supported form is: 'duration < exp and duration >= exp;'"
)
left_bound = duration_exp[0]
right_bound = duration_exp[2]
if (
left_bound[0][0] != TK_DURATION
or right_bound[0][0] != TK_DURATION
):
raise UPUnsupportedProblemTypeError(
"A duration expression in the form: 'duration < exp and duration >= exp;' is only "
+ "supported with the duration on the left side of the relational operator."
)
if left_bound[1] not in (TK_GT, TK_GE):
left_bound, right_bound = right_bound, left_bound
if left_bound[1] not in (TK_GT, TK_GE):
raise UPUnsupportedProblemTypeError(
"duration expression in the form: 'duration > exp and duration < exp;' detected, "
+ f"but was found {left_bound[1]} instead of '>/>='."
)
if right_bound[1] not in (TK_LT, TK_LE):
raise UPUnsupportedProblemTypeError(
"duration expression in the form: 'duration > exp and duration < exp;' detected, "
+ f"but was found {right_bound[1]} instead of '</<='."
)
is_left_open = left_bound[1] == TK_GT
is_right_open = right_bound[1] == TK_LT
l_bound_exp = self._parse_expression(
left_bound[2], action_parameters, types_map
)
r_bound_exp = self._parse_expression(
right_bound[2], action_parameters, types_map
)
action.set_duration_constraint(
DurationInterval(
l_bound_exp, r_bound_exp, is_left_open, is_right_open
)
)
# end handle duration
elif f_assign or f_increase or f_decrease: # handle effects
up_timing, up_effect = self._parse_assignment(
interval_and_exp, action_parameters, types_map
)
action._add_effect_instance(up_timing, up_effect)
# end handle effects
else: # handle condition
if f_when:
raise UPUnsupportedProblemTypeError(
"when keyword used in an action precondition is not supported by the UP."
)
up_interval = self._parse_interval(
interval_and_exp[0], action_parameters, types_map
)
exp_res = interval_and_exp[1][0]
condition = self._parse_expression(
exp_res, action_parameters, types_map
)
action.add_condition(up_interval, condition)
def _parse_parameters_def(
self, parameters_res: ParseResults, types_map: Dict[str, "Type"]
) -> "OrderedDict[str, Type]":
up_params: "OrderedDict[str, Type]" = OrderedDict()
for parameter_res in parameters_res:
param_type_res = parameter_res[0]
param_name_res = parameter_res[1]
assert isinstance(param_name_res, str)
up_params[param_name_res] = self._parse_type_reference(
param_type_res, types_map
)
return up_params
def _parse_interval(
self,
interval_res: ParseResults,
parameters: Dict[str, "Parameter"],
types_map: Dict[str, "Type"],
is_global: bool = False,
is_constant: bool = False,
) -> Union["Timing", "TimeInterval"]:
if len(interval_res) == 0:
if is_global:
if is_constant:
return GlobalStartTiming()
raise UPUnsupportedProblemTypeError(
"ANML constant initialization is currently not supported."
)
else:
return StartTiming()
contains_TK_ALL = TK_ALL in find_strings(interval_res, set((TK_ALL,)))
if len(interval_res) == 3:
l_par = interval_res[0]
timing_exp = interval_res[1]
r_par = interval_res[2]
assert l_par in (TK_L_PARENTHESIS, TK_L_BRACKET) and r_par in (
TK_R_PARENTHESIS,
TK_R_BRACKET,
), "parsing error"
if contains_TK_ALL: # prepare start and end, used later
start, end = (
(GlobalStartTiming(), GlobalEndTiming())
if is_global
else (StartTiming(), EndTiming())
)
if len(timing_exp) != 1: # TODO probably add additional check on all
raise UPUnsupportedProblemTypeError(
f"with the {TK_ALL} timing no expression is accepted"
)
else:
if l_par != TK_L_BRACKET:
raise ANMLSyntaxError(
"point intervals can't have '('; use '[' instead"
)
if r_par != TK_R_BRACKET:
raise ANMLSyntaxError(
"point intervals can't have ')'; use ']' instead"
)
return self._parse_timing(timing_exp, parameters, types_map, is_global)
else:
assert len(interval_res) == 4, "Parsing error, not able to handle"
l_par = interval_res[0]
start = self._parse_timing(
interval_res[1], parameters, types_map, is_global
)
end = self._parse_timing(interval_res[2], parameters, types_map, is_global)
r_par = interval_res[3]
if l_par == TK_L_BRACKET and r_par == TK_R_BRACKET:
return up.model.ClosedTimeInterval(start, end)
elif l_par == TK_L_BRACKET:
return up.model.RightOpenTimeInterval(start, end)
elif r_par == TK_R_BRACKET:
return up.model.LeftOpenTimeInterval(start, end)
return up.model.OpenTimeInterval(start, end)
def _parse_timing(
self,
timing_exp_res: ParseResults,
parameters: Dict[str, "Parameter"],
types_map: Dict[str, "Type"],
is_global: bool = False,
) -> "up.model.Timing":
assert len(timing_exp_res) == 1, "parsing error"
parsed_timing_exp = self._parse_expression(
timing_exp_res, parameters, types_map, is_global
).simplify()
if parsed_timing_exp.is_timing_exp():
return parsed_timing_exp.timing()
elif (
parsed_timing_exp.is_int_constant() or parsed_timing_exp.is_real_constant()
):
if not is_global:
raise ANMLSyntaxError(
f"Interval without start or end outside of an action is not valid."
)
delay = parsed_timing_exp.constant_value()
if delay < 0:
raise UPUnsupportedProblemTypeError(
f"Implicit start interval has delay {delay}, which is negative. UP currently only supports positive delays."
)
return GlobalStartTiming(delay)
elif parsed_timing_exp.is_plus() or parsed_timing_exp.is_minus():
if len(parsed_timing_exp.args) != 2:
raise UPUnsupportedProblemTypeError(
f"Timing parsed: {parsed_timing_exp}. UP currently only supports",
f" {TK_START} + constant, {TK_END} - constant or just a constant",
)
timing_exp = parsed_timing_exp.arg(0)
delay_exp = parsed_timing_exp.arg(1)
if parsed_timing_exp.is_plus() and not timing_exp.is_timing_exp():
timing_exp, delay_exp = delay_exp, timing_exp
if not timing_exp.is_timing_exp() or (
not delay_exp.is_int_constant() and not delay_exp.is_real_constant()
):
raise UPUnsupportedProblemTypeError(
f"Timing parsed: {parsed_timing_exp}. UP currently only supports",
f" {TK_START} + constant, {TK_END} - constant or just a constant",
)
timing = timing_exp.timing()
delay = delay_exp.constant_value()
if (
parsed_timing_exp.is_plus()
and (delay < 0 or not timing.is_from_start())
) or (
parsed_timing_exp.is_minus() and (delay < 0 or not timing.is_from_end())
):
raise UPUnsupportedProblemTypeError(
f"Timing parsed: {parsed_timing_exp}. UP currently only supports",
f" {TK_START} + constant, {TK_END} - constant or just a constant",
)
if parsed_timing_exp.is_minus():
delay = -delay
return Timing(delay, timing.timepoint)
else:
raise UPUnsupportedProblemTypeError(
f"Timing parsed: {parsed_timing_exp}. UP currently only supports",
f" {TK_START} + constant, {TK_END} - constant or just a constant",
)
def _check_conditional_intervals(
self,
interval_res: ParseResults,
condition_interval: ParseResults,
effect_interval: ParseResults,
) -> ParseResults:
# interval_res is the interval defined outside the conditional block
# condition_interval is the interval defined before the condition
# effect_interval is the interval defined before the effect
res = None
if interval_res:
res = interval_res
if condition_interval and res is not None:
if str(condition_interval) != str(res):
raise UPUnsupportedProblemTypeError(
"In conditional effect parsing the "
+ "condition time interval and the time interval outside the block are different, "
+ "this is not supported by the UP."
)
elif condition_interval:
res = condition_interval
if effect_interval and res is not None:
if str(effect_interval) != str(res):
raise UPUnsupportedProblemTypeError(
"In conditional effect parsing the "
+ "condition time interval and the time interval outside the block are different, "
+ "this is not supported by the UP."
)
elif effect_interval:
res = effect_interval
if res is None:
return interval_res
return res
def _parse_assignment(
self,
interval_and_effect: ParseResults,
parameters: Dict[str, "up.model.Parameter"],
types_map: Dict[str, "Type"],
is_global: bool = False,
) -> Tuple["Timing", "Effect"]:
interval_res = interval_and_effect[0]
effect_res = interval_and_effect[1]
if effect_res[0] == TK_WHEN: # Conditional effect
condition_interval = effect_res[1][0]
condition_exp_res = effect_res[1][1]
effect_interval = effect_res[2][0]
effect_exp = effect_res[2][1][0]
condition = self._parse_expression(condition_exp_res, parameters, types_map)
interval_res = self._check_conditional_intervals(
interval_res, condition_interval, effect_interval
)
else:
condition = self._em.TRUE()
effect_exp = effect_res[0]
variables: Dict[str, "up.model.Variable"] = {}
if effect_exp[0] == TK_FORALL: # Forall assignment
variables = dict(
(
(n, Variable(n, t))
for n, t in self._parse_parameters_def(
effect_exp["quantifier_variables"], types_map
).items()
)
)
effect_exp = effect_exp[2]
if effect_exp[0] == TK_WHEN: # Conditional effect
assert len(effect_exp) == 3, "Multiple expressions in forall assignment"
condition_interval = effect_exp[1][0]
condition_exp_res = effect_exp[1][1]
effect_interval = effect_exp[2][0]
effect_exp = effect_exp[2][1][0]
interval_res = self._check_conditional_intervals(
interval_res, condition_interval, effect_interval
)
condition = self._em.And(
self._parse_expression(
condition_exp_res, parameters, types_map, variables=variables
),
condition,
)
else:
effect_exp = effect_exp[0]
fluent_ref = effect_exp[0]
assignment_operator = effect_exp[1]
assigned_expression = effect_exp[2]
up_fluent = self._parse_expression(
fluent_ref, parameters, types_map, variables=variables
)
if not up_fluent.is_fluent_exp():
raise ANMLSyntaxError("left side of the assignment is not a valid fluent")
is_constant = up_fluent.fluent() in self._constant_fluents
up_interval = self._parse_interval(
interval_res, parameters, types_map, is_global, is_constant
)
up_value = self._parse_expression(
assigned_expression, parameters, types_map, variables=variables
)
if assignment_operator == TK_ASSIGN:
kind = EffectKind.ASSIGN
elif assignment_operator in TKS_INCREASE:
kind = EffectKind.INCREASE
elif assignment_operator in TKS_DECREASE:
kind = EffectKind.DECREASE
else:
raise NotImplementedError(
f"Currently the unified planning does not support the assignment operator {assignment_operator}"
)
if not isinstance(up_interval, Timing):
raise UPUnsupportedProblemTypeError(
"An effect with a durative interval is not supported"
)
return (
up_interval,
Effect(
up_fluent, up_value, condition, kind=kind, forall=variables.values()
),
)
def _parse_expression(
self,
expression: Union[ParseResults, List, str],
parameters: Dict[str, "up.model.Parameter"],
types_map: Dict[str, "Type"],
is_global: Optional[bool] = None,
variables: Optional[Dict[str, "up.model.Variable"]] = None,
) -> "up.model.FNode":
# A string here means it is a number or a boolean token. To avoid code duplication, just
# wrap it in a temporary list and let the stack management handle it.
if isinstance(expression, str):
expression = [expression]
vars = {} if variables is None else variables
stack: List[
Tuple[Union[ParseResults, List, str], bool, Dict[str, "Variable"]]
] = [(expression, False, vars)]
solved: List[FNode] = []
while len(stack) > 0:
exp, already_expanded, vars = stack.pop()
if already_expanded:
assert isinstance(exp, ParseResults) or isinstance(exp, List)
if len(exp) <= 1:
assert len(exp) == 1, "algorithm error"
pass
elif len(exp) == 2:
first_elem = exp[0]
if isinstance(first_elem, List) or isinstance(
first_elem, ParseResults
): # parameters list
second_elem = exp[1]
assert isinstance(second_elem, List) or isinstance(
second_elem, ParseResults
)
pass
elif first_elem == TK_MINUS: # unary minus
solved.append(self._em.Times(-1, solved.pop()))
elif first_elem == TK_PLUS: # unary plus
pass # already ok, nothing to do -> 0+x = x
elif first_elem == TK_NOT: # negation
solved.append(self._em.Not(solved.pop()))
elif first_elem in (
TK_START,
TK_END,
TK_ALL,
): # start, end or all timing_exp
if is_global is None:
raise ANMLSyntaxError(
f"{first_elem} is found outside of a timed expression."
)
assert (
first_elem != TK_ALL
), "Error, this case should have been handled before."
solved.append(
self._em.TimingExp(self._timings[(first_elem, is_global)])
)
elif first_elem in vars: # quantifier variable
solved.append(self._em.VariableExp(vars[first_elem]))
elif first_elem in parameters: # parameter exp
solved.append(self._em.ParameterExp(parameters[first_elem]))
elif self._problem.has_fluent(first_elem): # fluent exp
fluent = self._problem.fluent(first_elem)
fluent_args = tuple(solved.pop() for _ in range(fluent.arity))
solved.append(self._em.FluentExp(fluent, fluent_args))
elif self._problem.has_object(first_elem): # object exp
solved.append(
self._em.ObjectExp(self._problem.object(first_elem))
)
else:
raise NotImplementedError(
f"Currently the UP does not support the expression {exp}"
)
elif len(exp) == 3: # binary operator or parameters list
first_token = exp[0]
if isinstance(first_token, str) and first_token in (
TK_FORALL,
TK_EXISTS,
):
assert isinstance(exp, ParseResults) or isinstance(exp, list)
quantified_expressions = [solved.pop() for _ in exp[2]]
solved.append(
self._operators[first_token](
self._em.And(quantified_expressions), *vars.values()
)
)
else:
operator = exp[1]
if isinstance(operator, List): # parameters list
assert isinstance(exp[0], List) and isinstance(exp[2], List)
pass
elif isinstance(operator, str): # binary operator
# '==' needs special care, because in ANML it can both mean '==' or 'Iff',
# but in the UP those 2 cases are handled differently.
if operator == TK_EQUALS:
first_arg = solved.pop()
second_arg = solved.pop()
if first_arg.type.is_bool_type():
solved.append(self._em.Iff(first_arg, second_arg))
else:
solved.append(
self._em.Equals(first_arg, second_arg)
)
else:
func = self._operators.get(operator, None)
if func is not None:
first_arg = solved.pop()
second_arg = solved.pop()
solved.append(func(first_arg, second_arg))
else:
raise NotImplementedError(
f"Currently the UP does not support the parsing of the {operator} operator."
)
else:
raise NotImplementedError(
f"Currently the UP does not support the expression {exp}"
)
else: # expression longer than 3, must be a parameters list
for e in exp:
assert isinstance(e, List) or isinstance(
e, ParseResults
), f"expression {exp} is expected to be a parameters list, but it's not"
pass
else: # not solved
if isinstance(exp, ParseResults) or isinstance(exp, List):
first_token = exp[0]
if isinstance(first_token, str) and first_token in (
TK_FORALL,
TK_EXISTS,
): # quantifier
assert isinstance(exp, ParseResults) or isinstance(
exp, list
), f"{exp} <- {expression}, {type(exp)}"
name_type = self._parse_parameters_def(exp[1], types_map)
new_vars = {n: Variable(n, t) for n, t in name_type.items()}
stack.append((exp, True, new_vars))
all_vars = vars.copy()
all_vars.update(new_vars)
for e in exp[2]:
stack.append((e, False, all_vars))
else:
stack.append((exp, True, vars))
for e in exp:
if not isinstance(e, str): # nested structure
if len(e) > 0:
stack.append((e, False, vars))
else:
assert isinstance(e, str)
if (
e.isnumeric()
or is_float(e)
or e == TK_TRUE
or e == TK_FALSE
):
stack.append((e, False, vars))
elif isinstance(exp, str):
if exp.isnumeric(): # int
solved.append(self._em.Int(int(exp)))
elif is_float(exp): # float
solved.append(self._em.Real(Fraction(float(exp))))
elif exp == TK_TRUE: # true
solved.append(self._em.TRUE())
elif exp == TK_FALSE: # false
solved.append(self._em.FALSE())
else:
raise ANMLSyntaxError(f"Unable to solve {exp}")
else:
raise NotImplementedError
assert len(stack) == 0
assert len(solved) == 1, f"{expression}"
res = solved.pop()
return res.simplify()
def is_float(string: str) -> bool:
try:
float(string)
return True
except ValueError:
return False
def find_strings(result: Union[ParseResults, List], strings: Set[str]) -> Set[str]:
stack: List[Union[ParseResults, List]] = [result]
container: Set[str] = set()
while len(stack) > 0:
res = stack.pop()
for word in res:
if isinstance(word, ParseResults) or isinstance(word, List):
stack.append(word)
else:
assert isinstance(word, str)
if word in strings:
container.add(word)
if container == strings:
return container
return container