Source code for aiida_restapi.filter_syntax

# -*- coding: utf-8 -*-
"""The AiiDA QueryBuilder filter grammar resolver.

Converts the string into a dict that can be passed to
``QueryBuilder().append(..., filters=filters)``.

This grammar was originally adapted from:
https://github.com/Materials-Consortia/OPTIMADE/blob/master/optimade.rst#the-filter-language-ebnf-grammar
"""
# pylint: disable=too-many-branches
from importlib import resources
from typing import Any, Callable, Dict, List, Optional, Union

from lark import Lark, Token, Tree

from . import static
from .utils import parse_date

FILTER_GRAMMAR = resources.open_text(static, "filter_grammar.lark")

FILTER_PARSER = Lark(FILTER_GRAMMAR, start="filter")

_converters: Dict[str, Callable[[str], Any]] = {
    "FLOAT": float,
    "STRING": lambda s: s[1:-1],
    "PROPERTY": str,
    "INTEGER": int,
    "DIGITS": int,
    "DATE": parse_date,
    "TIME": parse_date,
    "DATETIME": parse_date,
}


[docs]def _parse_value(value: Token) -> Union[int, float, str]: """Parse a value token""" return _converters[value.type](value.value)
[docs]def _parse_valuelist(valuelist: Tree) -> List[Union[int, float, str]]: """Parse a valuelist tree.""" output = [] for child in valuelist.children: try: if child.data != "value": continue except AttributeError: continue output.append(_parse_value(child.children[0])) return output
[docs]def parse_filter_str(string: Optional[str]) -> Dict[str, Any]: """Parse a filter string to a list of ``QueryBuilder`` compliant operators.""" filters: Dict[str, Any] = {} if not string: return filters try: tree = FILTER_PARSER.parse(string) except Exception as err: raise ValueError(f"Malformed filter string: {err}") from err for child in tree.children: try: if child.data != "comparison": continue except AttributeError: continue # the first child will always be the property prop_token, rhs_tree = child.children[:2] # the first child will always be the comparator rhs_compare = rhs_tree.children[0] # parse the comparator value: Any if rhs_compare.data == "value_op_rhs": operator = rhs_compare.children[0].value.strip() value = _parse_value(rhs_compare.children[1].children[0]) elif rhs_compare.data == "fuzzy_string_op_rhs": operator = rhs_compare.children[0].type.lower() value = _parse_value(rhs_compare.children[-1]) elif rhs_compare.data == "length_op_rhs": operator = "of_length" value = _parse_value(rhs_compare.children[-1]) elif rhs_compare.data == "contains_op_rhs": operator = "contains" value = _parse_valuelist(rhs_compare.children[-1]) elif rhs_compare.data == "is_in_op_rhs": operator = "in" value = _parse_valuelist(rhs_compare.children[-1]) elif rhs_compare.data == "has_op_rhs": operator = "has_key" value = _parse_value(rhs_compare.children[-1]) else: raise ValueError(f"Unknown comparison: {rhs_compare.data}") if prop_token.value in filters: if "and" not in filters[prop_token.value]: current = filters.pop(prop_token.value) filters[prop_token.value] = {"and": [current]} filters[prop_token.value]["and"].append({operator: value}) else: filters[prop_token.value] = {operator: value} return filters