|
|
|
@ -20,19 +20,92 @@
|
|
|
|
|
'''This module offers capability of executing relational queries in parallel.'''
|
|
|
|
|
|
|
|
|
|
import optimizer
|
|
|
|
|
import multiprocessing
|
|
|
|
|
import parser
|
|
|
|
|
|
|
|
|
|
def weight (n,rels):
|
|
|
|
|
'''This function returns a weight that tries to give an approssimation of the
|
|
|
|
|
time that will be required to execute the expression'''
|
|
|
|
|
if n.kind==optimizer.RELATION: #Weight of a relation is its size
|
|
|
|
|
r=rels[n.name]
|
|
|
|
|
return len(r.content) * len(r.header.attributes)
|
|
|
|
|
elif n.kind==optimizer.BINARY and n.name=='ρ':
|
|
|
|
|
pass
|
|
|
|
|
elif n.kind==optimizer.BINARY and n.name=='σ':
|
|
|
|
|
pass
|
|
|
|
|
elif n.kind==optimizer.BINARY and n.name=='π':
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def execute(tree,rels):
|
|
|
|
|
'''This funcion executes a query in parallel.
|
|
|
|
|
Tree is the tree describing the query (usually obtained with
|
|
|
|
|
parser.tree(querystring)
|
|
|
|
|
rels is a dictionary containing the relations associated with the names'''
|
|
|
|
|
|
|
|
|
|
q = multiprocessing.Queue()
|
|
|
|
|
p = multiprocessing.Process(target=__p_exec__, args=(tree,rels,q,))
|
|
|
|
|
p.start()
|
|
|
|
|
result= q.get()
|
|
|
|
|
p.join()
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def __p_exec__(tree,rels,q):
|
|
|
|
|
'''q is the queue used for communication'''
|
|
|
|
|
if tree.kind==parser.RELATION:
|
|
|
|
|
q.put(rels[tree.name])
|
|
|
|
|
elif tree.kind==parser.UNARY:
|
|
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
#Obtain the relation
|
|
|
|
|
temp_q = multiprocessing.Queue()
|
|
|
|
|
__p_exec__(tree.child,rels,temp_q)
|
|
|
|
|
rel=temp_q.get()
|
|
|
|
|
|
|
|
|
|
#Execute the query
|
|
|
|
|
result=__p_exec_unary__(tree,rel)
|
|
|
|
|
q.put(result)
|
|
|
|
|
elif tree.kind==parser.BINARY:
|
|
|
|
|
left_q = multiprocessing.Queue()
|
|
|
|
|
left_p = multiprocessing.Process(target=__p_exec__, args=(tree.left,rels,left_q,))
|
|
|
|
|
right_q = multiprocessing.Queue()
|
|
|
|
|
right_p = multiprocessing.Process(target=__p_exec__, args=(tree.right,rels,right_q,))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#Spawn the children
|
|
|
|
|
left_p.start()
|
|
|
|
|
right_p.start()
|
|
|
|
|
|
|
|
|
|
#Get the left and right relations
|
|
|
|
|
left= left_q.get()
|
|
|
|
|
right= right_q.get()
|
|
|
|
|
|
|
|
|
|
#Wait for the children to terminate
|
|
|
|
|
left_p.join()
|
|
|
|
|
right_p.join()
|
|
|
|
|
|
|
|
|
|
result = __p_exec_binary__(tree,left,right)
|
|
|
|
|
q.put(result)
|
|
|
|
|
return
|
|
|
|
|
def __p_exec_binary__(tree,left,right):
|
|
|
|
|
if tree.name=='*':
|
|
|
|
|
return left.product(right)
|
|
|
|
|
elif tree.name=='-':
|
|
|
|
|
return left.difference(right)
|
|
|
|
|
elif tree.name=='ᑌ':
|
|
|
|
|
return left.union(right)
|
|
|
|
|
elif tree.name=='ᑎ':
|
|
|
|
|
return left.intersection(right)
|
|
|
|
|
elif tree.name=='÷':
|
|
|
|
|
return left.division(right)
|
|
|
|
|
elif tree.name=='ᐅᐊ':
|
|
|
|
|
return left.join(right)
|
|
|
|
|
elif tree.name=='ᐅLEFTᐊ':
|
|
|
|
|
return left.outer_left(right)
|
|
|
|
|
elif tree.name=='ᐅRIGHTᐊ':
|
|
|
|
|
return left.outer_right(right)
|
|
|
|
|
else: # tree.name=='ᐅFULLᐊ':
|
|
|
|
|
return left.outer(right)
|
|
|
|
|
|
|
|
|
|
def __p_exec_unary__(tree,rel):
|
|
|
|
|
if tree.name=='π':#Projection
|
|
|
|
|
tree.prop=tree.prop.replace(' ','').split(',')
|
|
|
|
|
result= rel.projection(tree.prop)
|
|
|
|
|
elif tree.name=="ρ": #Rename
|
|
|
|
|
#tree.prop='{\"%s\"}' % tree.prop.replace(',','\",\"').replace('➡','\":\"').replace(' ','')
|
|
|
|
|
d={}
|
|
|
|
|
tree.prop=tree.prop.replace(' ','')
|
|
|
|
|
for i in tree.prop.split(','):
|
|
|
|
|
rename_=i.split('➡')
|
|
|
|
|
d[rename_[0]]=rename_[1]
|
|
|
|
|
|
|
|
|
|
result= rel.rename(d)
|
|
|
|
|
else: #Selection
|
|
|
|
|
result= rel.selection(tree.prop)
|
|
|
|
|
return result
|
|
|
|
|
|