### Utilities for evaluating a parsed SPARQL expression using sparql-p
import rdflib
from rdfextras.sparql import operators
from rdfextras.sparql.query import SessionBNode
from rdflib.namespace import RDF
from rdflib.term import URIRef, Variable, BNode, Literal, Identifier
from rdflib.term import XSDToPython
from rdfextras.sparql import _questChar
from rdfextras.sparql import SPARQLError
from rdfextras.sparql.components import IRIRef # , NamedGraph, RemoteGraph
from rdfextras.sparql.components import (
BinaryOperator,
BuiltinFunctionCall,
EqualityOperator,
FunctionCall,
GreaterThanOperator,
GreaterThanOrEqualOperator,
LessThanOperator,
LessThanOrEqualOperator,
ListRedirect,
LogicalNegation,
NotEqualOperator,
NumericNegative,
ParsedAdditiveExpressionList,
ParsedCollection,
ParsedConditionalAndExpressionList,
ParsedConstrainedTriples,
ParsedDatatypedLiteral,
ParsedREGEXInvocation,
ParsedRelationalExpressionList,
ParsedString,
QName,
QNamePrefix,
RDFTerm,
UnaryOperator,
FUNCTION_NAMES)
class Unbound:
"""
A class to encapsulate a query variable. This class should be used in
conjunction with :class:`rdfextras.sparql.graph.BasicGraphPattern`.
"""
def __init__(self, name):
"""
:param name: the name of the variable (without the '?' character)
:type name: unicode or string
"""
if isinstance(name, basestring):
self.name = _questChar + name
self.origName = name
else:
raise SPARQLError(
"illegal argument, variable name must be a string or unicode")
def __repr__(self):
retval = "?%s" % self.origName
return retval
def __str__(self):
return self.__repr__()
DEBUG = False
BinaryOperatorMapping = {
LessThanOperator: 'operators.lt(%s,%s)%s',
EqualityOperator: 'operators.eq(%s,%s)%s',
NotEqualOperator: 'operators.neq(%s,%s)%s',
LessThanOrEqualOperator: 'operators.le(%s,%s)%s',
GreaterThanOperator: 'operators.gt(%s,%s)%s',
GreaterThanOrEqualOperator: 'operators.ge(%s,%s)%s',
}
UnaryOperatorMapping = {
LogicalNegation: 'not(%s)',
NumericNegative: '-(%s)',
}
CAMEL_CASE_BUILTINS = {
'isuri': 'operators.isURI',
'isiri': 'operators.isIRI',
'isblank': 'operators.isBlank',
'isliteral': 'operators.isLiteral',
}
[docs]class Resolver:
supportedSchemas = [None]
def normalize(self, uriRef, baseUri):
return baseUri + uriRef
[docs]class BNodeRef(BNode):
"""
An explicit reference to a persistent BNode in the data set.
This use of the syntax "_:x" to reference a named BNode is
technically in violation of the SPARQL spec, but is also
very useful. If an undistinguished variable is desired,
then an actual variable can be used as a trivial workaround.
Support for these can be disabled by disabling the
'EVAL_OPTION_ALLOW_BNODE_REF' evaulation option.
Also known as special 'session' BNodes. I.e., BNodes at
the query side which refer to BNodes in persistence
"""
pass
[docs]def convertTerm(term, queryProlog):
"""
Utility function for converting parsed Triple components into Unbound
"""
if isinstance(term, Variable):
if hasattr(queryProlog, 'variableBindings') \
and term in queryProlog.variableBindings:
# Resolve pre-bound variables at SQL generation time
# for SPARQL-to-SQL invokations
rt = queryProlog.variableBindings.get(term, term)
return isinstance(rt, BNode) and BNodeRef(rt) or rt
else:
return term
elif isinstance(term, BNodeRef):
return term
elif isinstance(term, BNode):
return term
elif isinstance(term, QName):
# QNames and QName prefixes are the same in the grammar
if not term.prefix:
if queryProlog is None:
return URIRef(term.localname)
else:
if queryProlog.baseDeclaration \
and u'' in queryProlog.prefixBindings \
and queryProlog.prefixBindings[u'']:
base = URIRef(Resolver(
).normalize(queryProlog.prefixBindings[u''],
queryProlog.baseDeclaration))
elif queryProlog.baseDeclaration:
base = queryProlog.baseDeclaration
else:
base = queryProlog.prefixBindings[u'']
return URIRef(Resolver().normalize(term.localname, base))
elif term.prefix == '_':
# Told BNode See:
# http://www.w3.org/2001/sw/DataAccess/issues#bnodeRef
# from rdfextras.sparql.sql.RdfSqlBuilder import RdfSqlBuilder
# from rdfextras.sparql.sql.RdfSqlBuilder import (
# EVAL_OPTION_ALLOW_BNODE_REF)
# from rdfextras.sparql.sql.RdfSqlBuilder import BNodeRef
# if isinstance(queryProlog,RdfSqlBuilder):
# if queryProlog.UseEvalOption(EVAL_OPTION_ALLOW_BNODE_REF):
# # this is a 'told' BNode referencing a BNode in the data
# # set (i.e. previously returned by a query)
# return BNodeRef(term.localname)
# else:
# # follow the spec and treat it as a variable
# # ensure namespace doesn't overlap with variables
# return BNode(term.localname + '_bnode')
import warnings
warnings.warn("The verbatim interpretation of explicit bnode" + \
"identifiers is contrary to (current) DAWG stance",
SyntaxWarning)
return SessionBNode(term.localname)
else:
return URIRef(Resolver().normalize(
term.localname,
queryProlog.prefixBindings[term.prefix]))
elif isinstance(term, QNamePrefix):
if queryProlog is None:
return URIRef(term)
else:
if queryProlog.baseDeclaration is None:
return URIRef(term)
return URIRef(Resolver().normalize(
term, queryProlog.baseDeclaration))
elif isinstance(term, ParsedString):
return Literal(term)
elif isinstance(term, ParsedDatatypedLiteral):
dT = term.dataType
if isinstance(dT, QName):
dT = convertTerm(dT, queryProlog)
return Literal(term.value, datatype=dT)
elif isinstance(term, IRIRef) and queryProlog.baseDeclaration:
return URIRef(Resolver().normalize(
term, queryProlog.baseDeclaration))
else:
return term
[docs]def unRollCollection(collection, queryProlog):
nestedComplexTerms = []
listStart = convertTerm(collection.identifier, queryProlog)
if not collection._list:
return
yield (listStart, RDF.rest, RDF.nil)
elif len(collection._list) == 1:
singleItem = collection._list[0]
if isinstance(singleItem, RDFTerm):
nestedComplexTerms.append(singleItem)
yield (listStart, RDF.first, convertTerm(
singleItem.identifier, queryProlog))
else:
yield (listStart, RDF.first, convertTerm(singleItem, queryProlog))
yield (listStart, RDF.rest, RDF.nil)
else:
singleItem = collection._list[0]
if isinstance(singleItem, Identifier):
singleItem = singleItem
else:
singleItem = singleItem.identifier
yield (listStart, RDF.first, convertTerm(singleItem, queryProlog))
prevLink = listStart
for colObj in collection._list[1:]:
linkNode = convertTerm(BNode(), queryProlog)
if isinstance(colObj, RDFTerm):
nestedComplexTerms.append(colObj)
yield (linkNode, RDF.first,
convertTerm(colObj.identifier, queryProlog))
else:
yield (linkNode, RDF.first, convertTerm(colObj, queryProlog))
yield (prevLink, RDF.rest, linkNode)
prevLink = linkNode
yield (prevLink, RDF.rest, RDF.nil)
for additionalItem in nestedComplexTerms:
for item in unRollRDFTerm(additionalItem, queryProlog):
yield item
[docs]def unRollRDFTerm(item, queryProlog):
nestedComplexTerms = []
for propVal in item.propVals:
for propObj in propVal.objects:
if isinstance(propObj, RDFTerm):
nestedComplexTerms.append(propObj)
yield (convertTerm(item.identifier, queryProlog),
convertTerm(propVal.property, queryProlog),
convertTerm(propObj.identifier, queryProlog))
else:
yield (convertTerm(item.identifier, queryProlog),
convertTerm(propVal.property, queryProlog),
convertTerm(propObj, queryProlog))
if isinstance(item, ParsedCollection):
for rt in unRollCollection(item, queryProlog):
yield rt
for additionalItem in nestedComplexTerms:
for item in unRollRDFTerm(additionalItem, queryProlog):
yield item
[docs]def unRollTripleItems(items, queryProlog):
"""
Takes a list of Triples (nested lists or ParsedConstrainedTriples)
and (recursively) returns a generator over all the contained triple
patterns
"""
if isinstance(items, RDFTerm):
for item in unRollRDFTerm(items, queryProlog):
yield item
elif isinstance(items, ParsedConstrainedTriples):
assert isinstance(items.triples, list)
for item in items.triples:
if isinstance(item, RDFTerm):
for i in unRollRDFTerm(item, queryProlog):
yield i
else:
for i in unRollTripleItems(item, queryProlog):
yield item
else:
for item in items:
if isinstance(item, RDFTerm):
for i in unRollRDFTerm(item, queryProlog):
yield i
else:
for i in unRollTripleItems(item, queryProlog):
yield item
[docs]def mapToOperator(expr, prolog, combinationArg=None, constraint=False):
"""
Reduces certain expressions (operator expressions, function calls, terms,
and combinator expressions) into strings of their Python equivalent
"""
if prolog.DEBUG:
print('''mapToOperator:\n\texpr=%s,\n\ttype=%s,\n\tconstr=%s.\n''' % (
expr, type(expr), constraint))
combinationInvokation = combinationArg and '(%s)' % combinationArg or ""
if isinstance(expr, ListRedirect):
expr = expr.reduce()
if isinstance(expr, UnaryOperator):
return UnaryOperatorMapping[type(expr)] % (
mapToOperator(expr.argument, prolog, combinationArg,
constraint=constraint))
elif isinstance(expr, BinaryOperator):
return BinaryOperatorMapping[type(expr)] % (
mapToOperator(expr.left, prolog, combinationArg,
constraint=constraint),
mapToOperator(expr.right, prolog, combinationArg,
constraint=constraint),
combinationInvokation
)
elif isinstance(expr, (Variable, Unbound)): # GJH the only use of Unbound
if constraint:
return """operators.EBV(rdflib.Variable("%s"))%s""" % (
expr.n3(), combinationInvokation)
else:
return '"?%s"' % expr
elif isinstance(expr, ParsedREGEXInvocation):
return 'operators.regex(%s, %s%s)%s' % (
mapToOperator(expr.arg1, prolog, combinationArg,
constraint=constraint),
mapToOperator(expr.arg2, prolog, combinationArg,
constraint=constraint),
expr.arg3 and ',"' + str(expr.arg3) + '"' or '',
combinationInvokation
)
elif isinstance(expr, BuiltinFunctionCall):
normBuiltInName = FUNCTION_NAMES[expr.name].lower()
normBuiltInName = CAMEL_CASE_BUILTINS.get(
normBuiltInName, 'operators.' + normBuiltInName)
return "%s(%s)%s" % (
normBuiltInName,
",".join([mapToOperator(i, prolog, combinationArg,
constraint=constraint)
for i in expr.arguments]),
combinationInvokation)
elif isinstance(expr, ParsedDatatypedLiteral):
lit = Literal(expr.value, datatype=convertTerm(expr.dataType, prolog))
if constraint:
return """operators.EBV(%r)%s""" % (lit, combinationInvokation)
else:
return repr(lit)
elif isinstance(expr, (Literal, URIRef)):
return repr(expr)
elif isinstance(expr, QName):
if expr[:2] == '_:':
return repr(BNode(expr[2:]))
else:
return "'%s'" % convertTerm(expr, prolog)
elif isinstance(expr, basestring):
return "'%s'" % convertTerm(expr, prolog)
elif isinstance(expr, ParsedAdditiveExpressionList):
return 'Literal(%s)' % (
operators.addOperator(
[mapToOperator(item, prolog, combinationArg='i',
constraint=constraint)
for item in expr],
combinationArg))
elif isinstance(expr, FunctionCall):
if isinstance(expr.name, QName):
fUri = convertTerm(expr.name, prolog)
if fUri in XSDToPython:
return "operators.XSDCast(%s, '%s')%s" % (
mapToOperator(expr.arguments[0], prolog, combinationArg='i',
constraint=constraint),
fUri,
combinationInvokation)
# @@FIXME The hook for extension functions goes here
if fUri not in prolog.extensionFunctions:
import warnings
warnings.warn(
"Use of unregistered extension function: %s" % (fUri),
UserWarning, 1)
else:
raise NotImplemented(
"Extension Mechanism hook not yet completely hooked up..")
else:
if isinstance(expr, ListRedirect):
expr = expr.reduce()
if expr.pyBooleanOperator:
return expr.pyBooleanOperator.join(
[mapToOperator(i, prolog,
constraint=constraint)
for i in expr])
raise Exception("What do i do with %s (a %s)?" % (
expr, type(expr).__name__))
[docs]def createSPARQLPConstraint(filter, prolog):
"""
Takes an instance of either ParsedExpressionFilter or ParsedFunctionFilter
and converts it to a sparql-p operator by composing a python string of
lambda functions and SPARQL operators.
This string is then evaluated to return the actual function for sparql-p
"""
reducedFilter = isinstance(filter.filter, ListRedirect) \
and filter.filter.reduce() \
or filter.filter
if prolog.DEBUG:
print("createSPARQLPConstraint reducedFilter=%s, type=%s" % (
reducedFilter, type(reducedFilter)))
if isinstance(reducedFilter, (ListRedirect,
BinaryOperator,
UnaryOperator,
BuiltinFunctionCall,
ParsedREGEXInvocation)):
if isinstance(reducedFilter, UnaryOperator) \
and isinstance(reducedFilter.argument, Variable):
const = True
else:
const = False
else:
const = True
if prolog.DEBUG:
print("createSPARQLPConst: reducedFilterType = %s, constraint = %s" % (
type(reducedFilter), const))
if isinstance(reducedFilter, ParsedConditionalAndExpressionList):
combinationLambda = 'lambda i: %s' % (' or '.join(
['%s' % mapToOperator(expr, prolog, combinationArg='i',
constraint=const)
for expr in reducedFilter]))
if prolog.DEBUG:
print("a. sparql-p operator(s): %s" % combinationLambda)
return eval(combinationLambda)
elif isinstance(reducedFilter, ParsedRelationalExpressionList):
combinationLambda = 'lambda i: %s' % (' and '.join(
['%s' % mapToOperator(expr, prolog, combinationArg='i',
constraint=const)
for expr in reducedFilter]))
if prolog.DEBUG:
print("b. sparql-p operator(s): %s" % combinationLambda)
return eval(combinationLambda)
elif isinstance(reducedFilter, BuiltinFunctionCall):
rt = mapToOperator(reducedFilter, prolog,
constraint=const)
if prolog.DEBUG:
print("c. sparql-p operator(s): %s" % rt)
return eval(rt)
elif isinstance(reducedFilter, (
ParsedAdditiveExpressionList, UnaryOperator, FunctionCall)):
rt = 'lambda i: %s' % (
mapToOperator(reducedFilter, prolog, combinationArg='i',
constraint=const))
if prolog.DEBUG:
print("d. sparql-p operator(s): %s" % rt)
return eval(rt)
elif isinstance(reducedFilter, Variable):
rt = """operators.EBV(rdflib.Variable("%s"))""" % reducedFilter.n3()
if prolog.DEBUG:
print("e. sparql-p operator(s): %s" % rt)
return eval(rt)
# reducedFilter = BuiltinFunctionCall(BOUND,reducedFilter)
# rt = mapToOperator(reducedFilter,prolog)
# if prolog.DEBUG:
# print "sparql-p operator(s): %s" % rt
# return eval(rt)
else:
if reducedFilter == u'true' or reducedFilter == u'false':
def trueFn(arg):
return True
def falseFn(arg):
return False
return reducedFilter == u'true' and trueFn or falseFn
rt = mapToOperator(reducedFilter, prolog,
constraint=const)
if prolog.DEBUG:
print("f. sparql-p operator(s): %s" % rt)
return eval(rt)
[docs]def isTriplePattern(nestedTriples):
"""
Determines (recursively) if the BasicGraphPattern contains any Triple
Patterns returning a boolean flag indicating if it does or not
"""
if isinstance(nestedTriples, list):
for i in nestedTriples:
if isTriplePattern(i):
return True
return False
elif isinstance(nestedTriples, ParsedConstrainedTriples):
if nestedTriples.triples:
return isTriplePattern(nestedTriples.triples)
else:
return False
elif isinstance(nestedTriples, ParsedConstrainedTriples) \
and not nestedTriples.triples:
return isTriplePattern(nestedTriples.triples)
else:
return True
# CONSTRUCT_NOT_SUPPORTED = 1
# ExceptionMessages = {
# CONSTRUCT_NOT_SUPPORTED: '"Construct" is not (currently) supported',
# }
# class NotImplemented(Exception):
# def __init__(self, code, msg):
# self.code = code
# self.msg = msg
# def __str__(self):
# return ExceptionMessages[self.code] + ' :' + self.msg