Source code for dae.backends.attributes_query
# pylint: disable=too-few-public-methods
import enum
from functools import reduce
from lark import Lark, InlineTransformer
from lark.visitors import Interpreter
from lark.reconstruct import Reconstructor
from dae.variants.core import Allele
from dae.variants.attributes import Role, Inheritance, Sex
from dae.variants.variant import allele_type_from_name
QUERY_GRAMMAR = """
?start: expression
?expression: logical_or
?logical_or: logical_and (" " "or" " " logical_and)*
?logical_and: molecule (" " "and" " " molecule)*
?molecule: functions
| negation
| atom
negation: "not" " " molecule -> negation
?functions: any | all | eq
any: "any"i "(" _arglist ")"
all: "all"i "(" _arglist ")"
eq: "eq"i "(" _simplearglist ")"
?atom: "(" logical_or ")" | less_than | more_than | less_than_eq
| more_than_eq | arg
less_than: ">" SIGNED_NUMBER
more_than: "<" SIGNED_NUMBER
less_than_eq: ">=" SIGNED_NUMBER
more_than_eq: "<=" SIGNED_NUMBER
arg: simple_arg
simple_arg: STRING | "'" STRING "'" | "\\"" STRING "\\""
STRING : ("_"|LETTER|DIGIT|"."|"-"|"'"|"+")+
_arglist: (arg "," )* arg [","]
_simplearglist: (simple_arg "," )* simple_arg [","]
%import common.SIGNED_NUMBER -> SIGNED_NUMBER
%import common.LETTER -> LETTER
%import common.DIGIT -> DIGIT
%import common.WS_INLINE -> WS
%ignore WS
"""
parser_with_ambiguity = Lark(QUERY_GRAMMAR, ambiguity="explicit")
PARSER = Lark(QUERY_GRAMMAR)
[docs]class Matcher:
"""No idea what this class is supposed to do. If you know please edit."""
def __init__(self, tree, parser, matcher):
assert matcher is not None
assert tree is not None
assert parser is not None
self.matcher = matcher
self.tree = tree
self.parser = parser
self._reconstructor = Reconstructor(parser)
[docs] def match(self, array):
if not isinstance(array, set):
array = set(array)
return self.matcher(array)
[docs]class BaseQueryTransformerMatcher:
"""Base class for query transformer matchers."""
def __init__(self, parser=PARSER, token_converter=None):
self.parser = parser
self.transformer = StringQueryToTreeTransformer(
parser, token_converter
)
self.transformer2 = None
[docs] def transform_query_string_to_tree(self, expression):
parsed = self.parser.parse(expression)
return self.transformer.transform(parsed)
[docs] def transform(self, tree):
return self.transform_tree_to_matcher(self.transformer.transform(tree))
[docs]class StringQueryToTreeTransformer(InlineTransformer):
# pylint: disable=no-self-use
"""Convert tokens using a token converter."""
def __init__(self, parser=PARSER, token_converter=None):
# pylint: disable=unused-argument
super().__init__()
if token_converter is None:
self.token_converter = lambda x: x
else:
self.token_converter = token_converter
[docs]class Leaf:
# pylint: disable=invalid-name
def __init__(self, op, value):
self.value = value
self.op = op
[docs]class BaseTreeTransformer:
[docs] def transform(self, node):
if isinstance(node, TreeNode):
children = [self.transform(c) for c in node.children]
return getattr(self, type(node).__name__)(children)
return getattr(self, type(node).__name__)(node.arg)
[docs]class QueryTreeToLambdaTransformer(BaseTreeTransformer):
# pylint: disable=no-self-use,invalid-name
"""Transforma all nodes to python lambda functions."""
[docs] def NotNode(self, children):
assert len(children) == 1
child = children[0]
return lambda x: not child(x)
[docs]class QueryTreeToBitwiseLambdaTransformer(BaseTreeTransformer):
# pylint: disable=no-self-use,invalid-name
"""No idea what this is supposed to do. Please edit."""
[docs] def NotNode(self, children):
assert len(children) == 1
child = children[0]
return lambda x: not child(x)
[docs]def inheritance_converter(arg):
if not isinstance(arg, Inheritance):
return Inheritance.from_name(arg)
return arg
[docs]def variant_type_converter(arg):
if not isinstance(arg, Allele.Type):
return allele_type_from_name(arg)
return arg
[docs]class QueryTreeToSQLTransformer(BaseTreeTransformer):
# pylint: disable=invalid-name
"""I don't know what this class does. Please edit if you do."""
def __init__(self, column_name, add_unnest=False):
self.column_name = column_name
self.add_unnest = add_unnest
super().__init__()
[docs] @staticmethod
def token_converter(arg):
if isinstance(arg, enum.Enum):
return str(arg.value)
return str(arg)
[docs] def GreaterThanEqNode(self, arg):
return self.column_name + " <= " + self.token_converter(arg)
[docs] @staticmethod
def NotNode(children):
assert len(children) == 1
return "NOT (" + children[0] + ")"
[docs] @staticmethod
def AndNode(children):
return "(" + reduce((lambda x, y: x + " AND " + y), children) + ")"
[docs] @staticmethod
def OrNode(children):
return "(" + reduce((lambda x, y: x + " OR " + y), children) + ")"
[docs]class QueryTreeToSQLListTransformer(QueryTreeToSQLTransformer):
"""I don't know what this class does. Please edit if you do."""
[docs] def ContainsNode(self, arg):
return (
"array_contains("
+ self.column_name
+ ", "
+ self.token_converter(arg)
+ ")"
)
[docs] def ElementOfNode(self, arg):
if not arg:
return self.column_name + " IS NULL"
return (
self.column_name
+ " IN ("
+ ",".join([self.token_converter(a) for a in arg])
+ ")"
)
[docs] def EqualsNode(self, arg):
arg = [self.token_converter(a) for a in arg]
return (
"concat_ws('|',"
+ self.column_name
+ ")"
+ " = concat_ws('|', array("
+ reduce((lambda x, y: x + ", " + y), arg)
+ "))"
)
[docs]class QueryTreeToSQLBitwiseTransformer(QueryTreeToSQLTransformer):
"""I don't know what this class does. Please edit if you do."""
[docs] def ContainsNode(self, arg):
converted_token = self.token_converter(arg)
if self.add_unnest:
return f"((SELECT BIT_AND(x) FROM UNNEST([{self.column_name}, "\
f"{converted_token}]) as x) != 0)"
return f"(BITAND({self.column_name}, {converted_token}) != 0)"
[docs] def ElementOfNode(self, arg):
converted_token = self.token_converter(arg)
if self.add_unnest:
return f"((SELECT BIT_AND(x) FROM UNNEST([{self.column_name}, "\
f"{converted_token}]) as x) != 0)"
return f"(BITAND({self.column_name}, {converted_token}) != 0)"
[docs] @staticmethod
def NotNode(children):
assert len(children) == 1
return f"(NOT ({children[0]}))"
[docs] @staticmethod
def AndNode(children):
res = reduce(lambda x, y: f"({x}) AND ({y})", children)
return res
[docs] @staticmethod
def OrNode(children):
res = reduce(lambda x, y: f"({x}) OR ({y})", children)
return res
# class RegionsQueryToSQLTransformer(object):
#
# def __init__(self):
# pass
#
# def parse(self, regions):
# assert all([isinstance(r, Region) for r in regions])
# pass
#
# def parse_region(self, region):
# assert isinstance(region, Region)
# return {
# 'chrom': EqualsNode(region.chr),
# 'position': AndNode([
# GreaterThanEqNode(region.start),
# LessThanEqNode(region.stop),
# ])
# }
[docs]class StringQueryToTreeTransformerWrapper:
"""No idea what this is supposed to do. Please edit."""
def __init__(self, parser=PARSER, token_converter=None):
self.parser = parser
self.transformer = StringQueryToTreeTransformer(
parser, token_converter
)
[docs]class StringListQueryToTreeTransformer:
"""No idea what this is supposed to do. Please edit."""
[docs] @staticmethod
def parse_and_transform(expression):
assert isinstance(expression, list)
return ElementOfNode(expression)
[docs]class BitwiseTreeTransformer(Interpreter):
# pylint: disable=invalid-name,no-self-use
"""Transform bitwise expressions."""
def __init__(self, token_converter):
super().__init__()
self.parser = PARSER
self.token_converter = token_converter
[docs]class QueryTransformerMatcher(BaseQueryTransformerMatcher):
"""No idea what this is supposed to do. Please edit."""
def __init__(
self, parser=PARSER, token_converter=None,
transformer2=QueryTreeToLambdaTransformer()):
super().__init__(parser, token_converter)
self.transformer2 = transformer2
[docs] def transform_tree_to_matcher(self, tree):
matcher = self.transformer2.transform(tree)
return Matcher(tree, self.parser, matcher)
role_query = QueryTransformerMatcher(token_converter=roles_converter)
sex_query = QueryTransformerMatcher(token_converter=sex_converter)
inheritance_query = QueryTransformerMatcher(
token_converter=inheritance_converter
)
variant_type_query = QueryTransformerMatcher(
token_converter=variant_type_converter,
transformer2=QueryTreeToBitwiseLambdaTransformer()
)