Source code for unified_planning.io.pddl_reader

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import re
from typing import Callable, List
import typing
from warnings import warn
import unified_planning as up
from unified_planning.environment import Environment, get_environment
from unified_planning.exceptions import (
    UPUnsupportedProblemTypeError,
)
from unified_planning.interop.from_pddl import (
    check_ai_pddl_requirements,
    convert_problem_from_ai_pddl,
)
from unified_planning.io.up_pddl_reader import UPPDDLReader

from pddl.parser.domain import DomainParser  # type: ignore
from pddl.parser.problem import ProblemParser  # type: ignore


[docs] class PDDLReader: """ Uses the `unified_planning.interop.from_pddl.from_ai_pddl` if the requirements are respected, otherwise uses the `UPPDDLReader` to parse the `PDDL` domain and the `PDDL` problem; generates the equivalent :class:`~unified_planning.model.Problem`. Note: in the error report messages, a tabulation counts as one column; and due to PDDL case-insensitivity, everything in the PDDL files will be turned to lower case, so the names of fluents, actions etc. and the error report will all be in lower-case. """ def __init__( self, environment: typing.Optional[Environment] = None, force_up_pddl_reader: bool = False, force_ai_planning_reader: bool = False, deactivate_fallback: bool = False, disable_warnings: bool = False, ): """ Creates the `PDDLReader` with the specified parameters. :param environment: the environment used to create the problems/plans, defaults to None :param force_up_pddl_reader: when `True` forces the `parse_problem` methods to use the `UPPDDLReader`, defaults to False :param force_ai_planning_reader: when `True` forces `parse_problem` methods to use the `from_ai_pddl` method, defaults to False :param deactivate_fallback: when `True` disables the fallback on the `UPPDDLReader` if `from_ai_pddl` fails, defaults to False :param disable_warnings: when `True` the warnings when `from_ai_pddl` fails but the requirements are respected are not raised, defaults to False """ self._env = get_environment(environment) self._up_pddl_reader = ( UPPDDLReader(self._env) if force_up_pddl_reader else None ) # Lazy initialization self._force_up_pddl_reader = force_up_pddl_reader self._force_ai_planning_reader = force_ai_planning_reader self._deactivate_fallback = deactivate_fallback self._disable_warnings = disable_warnings
[docs] def parse_problem( self, domain_filename: str, problem_filename: typing.Optional[str] = None ) -> "up.model.Problem": """ Takes in input a filename containing the `PDDL` domain and optionally a filename containing the `PDDL` problem and returns the parsed `Problem`; the Problem is parsed using `from_ai_pddl` if all the requirements specified in the domain are supported, if this fails or the requirements specified are not supported, falls back to the `UPPDDLReader`. Note: that if the `problem_filename` is `None`, an incomplete `Problem` will be returned. Note: due to PDDL case-insensitivity, everything in the PDDL files will be turned to lower case, so the names of fluents, actions etc. and the error report will all be in lower-case. :param domain_filename: The path to the file containing the `PDDL` domain. :param problem_filename: Optionally the path to the file containing the `PDDL` problem. :return: The `Problem` parsed from the given pddl domain + problem. """ with open(domain_filename, encoding="utf-8-sig") as domain_file: domain_str = domain_file.read() problem_str = None if problem_filename is not None: with open(problem_filename, encoding="utf-8-sig") as problem_file: problem_str = problem_file.read() return self.parse_problem_string(domain_str, problem_str)
[docs] def parse_problem_string( self, domain_str: str, problem_str: typing.Optional[str] = None ) -> "up.model.Problem": """ Takes in input a str representing the `PDDL` domain and optionally a str representing the `PDDL` problem and returns the parsed `Problem`; the Problem is parsed using `from_ai_pddl` if all the requirements specified in the domain are supported, if this fails or the requirements specified are not supported, falls back to the `UPPDDLReader`. Note that if the `problem_str` is `None`, an incomplete `Problem` will be returned. Note: due to PDDL case-insensitivity, everything in the PDDL files will be turned to lower case, so the names of fluents, actions etc. and the error report will all be in lower-case. :param domain_filename: The string representing the `PDDL` domain. :param problem_filename: Optionally the string representing the `PDDL` problem. :return: The `Problem` parsed from the given pddl domain + problem. """ if self._force_up_pddl_reader: assert self._up_pddl_reader is not None return self._up_pddl_reader.parse_problem_string(domain_str, problem_str) if self._force_ai_planning_reader: ai_domain = DomainParser()(domain_str) ai_problem = ( ProblemParser()(problem_str) if problem_str is not None else None ) return convert_problem_from_ai_pddl(ai_domain, ai_problem, self._env) requirements = extract_pddl_requirements(domain_str) if check_ai_pddl_requirements(requirements): ai_pddl_parsing_failed = False try: ai_domain = DomainParser()(domain_str) ai_problem = ProblemParser()(problem_str) except Exception as e: ai_pddl_parsing_failed = True if self._deactivate_fallback: raise e if not self._disable_warnings: warn( f"The problem could not be converted using the AI Planning reader due to an issue in the AI PDDL parser: {e}" ) if not ai_pddl_parsing_failed: try: return convert_problem_from_ai_pddl( ai_domain, ai_problem, self._env ) except UPUnsupportedProblemTypeError as e: if self._deactivate_fallback: raise e if not self._disable_warnings: warn( f"The problem could not be converted using the AI Planning reader due to an issue in the UP converter: {e}" ) if self._up_pddl_reader is None: self._up_pddl_reader = UPPDDLReader(self._env) assert self._up_pddl_reader is not None return self._up_pddl_reader.parse_problem_string(domain_str, problem_str)
[docs] def parse_plan( self, problem: "up.model.Problem", plan_filename: str, get_item_named: typing.Optional[ Callable[ [str], "up.io.pddl_writer.WithName", ] ] = None, ) -> "up.plans.Plan": """ Takes a problem, a filename and optionally a map of renaming and returns the plan parsed from the file. The format of the file must be: ``(action-name param1 param2 ... paramN)`` in each line for SequentialPlans ``start-time: (action-name param1 param2 ... paramN) [duration]`` in each line for TimeTriggeredPlans, where ``[duration]`` is optional and not specified for InstantaneousActions. :param problem: The up.model.problem.Problem instance for which the plan is generated. :param plan_filename: The path of the file in which the plan is written. :param get_item_named: A function that takes a name and returns the original up.model element instance linked to that renaming; if None the problem is used to retrieve the actions and objects in the plan from their name. :return: The up.plans.Plan corresponding to the parsed plan from the file """ with open(plan_filename, encoding="utf-8-sig") as plan: return self.parse_plan_string(problem, plan.read(), get_item_named)
[docs] def parse_plan_string( self, problem: "up.model.Problem", plan_str: str, get_item_named: typing.Optional[ Callable[ [str], "up.io.pddl_writer.WithName", ] ] = None, ) -> "up.plans.Plan": """ Takes a problem, a string and optionally a map of renaming and returns the plan parsed from the string. The format of the file must be: ``(action-name param1 param2 ... paramN)`` in each line for SequentialPlans ``start-time: (action-name param1 param2 ... paramN) [duration]`` in each line for TimeTriggeredPlans, where ``[duration]`` is optional and not specified for InstantaneousActions. :param problem: The up.model.problem.Problem instance for which the plan is generated. :param plan_str: The plan in string. :param get_item_named: A function that takes a name and returns the original up.model element instance linked to that renaming; if None the problem is used to retrieve the actions and objects in the plan from their name.:return: The up.plans.Plan corresponding to the parsed plan from the string """ if self._up_pddl_reader is None: self._up_pddl_reader = UPPDDLReader(self._env) assert self._up_pddl_reader is not None return self._up_pddl_reader.parse_plan_string(problem, plan_str, get_item_named)
def extract_pddl_requirements(domain_str: str) -> List[str]: """ Extract the requirements from the given domain in a List of requirements strings. For example if the requirements are `(:requirements :strips :typing)` returns: `[":strips", ":typing"]` :param domain_str: the domain str from which the requirements have to be extracted. :return: The `List[str]` of requirements extracted from the domain. """ requirements_lines = [] found_requirements = False for line in domain_str.splitlines(): if ":requirements" in line: assert not found_requirements found_requirements = True if found_requirements: requirements_lines.append(line) if ")" in line: break requirements_str = " ".join(requirements_lines) match = re.search(r"\(:requirements\s+([^)]+)\)", requirements_str) if match: requirements = match.group(1).split() return requirements else: return []