congr-visualizer / src /alignment.py
Shahzaib98's picture
Upload 11 files
d2ff6a7 verified
"""
Adapted from Jonathan Dursi
https://github.com/ljdursi/poapy
"""
import numpy
class SeqGraphAlignment(object):
__matchscore = 1
__mismatchscore = -2
__gap = -1
def __init__(
self,
sequence,
graph,
fastMethod=True,
globalAlign=False,
matchscore=__matchscore,
mismatchscore=__mismatchscore,
gapscore=__gap,
*args,
**kwargs,
):
self._mismatchscore = mismatchscore
self._matchscore = matchscore
self._gap = gapscore
self.sequence = sequence
self.graph = graph
self.stringidxs = None
self.nodeidxs = None
self.globalAlign = globalAlign
if fastMethod:
matches = self.alignStringToGraphFast(*args, **kwargs)
else:
matches = self.alignStringToGraphSimple(*args, **kwargs)
self.stringidxs, self.nodeidxs = matches
def alignmentStrings(self):
return "".join(
self.sequence[i] if i is not None else "-" for i in self.stringidxs
), "".join(self.graph.nodedict[j].text if j is not None else "-" for j in self.nodeidxs)
def matchscore(self, c1, c2):
if c1 == c2:
return self._matchscore
else:
return self._mismatchscore
def matchscoreVec(self, c, v):
return numpy.where(v == c, self._matchscore, self._mismatchscore)
def alignStringToGraphSimple(self):
"""Align string to graph, following same approach as smith waterman
example"""
if type(self.sequence) is not str:
raise TypeError("Invalid Type")
nodeIDtoIndex, nodeIndexToID, scores, backStrIdx, backGrphIdx = (
self.initializeDynamicProgrammingData()
)
# Dynamic Programming
ni = self.graph.nodeiterator()
for i, node in enumerate(ni()):
pbase = node.text
for j, sbase in enumerate(self.sequence):
# add all candidates to a list, pick the best
candidates = [(scores[i + 1, j] + self._gap, i + 1, j, "INS")]
for predIndex in self.prevIndices(node, nodeIDtoIndex):
candidates += [
(scores[predIndex + 1, j + 1] + self._gap, predIndex + 1, j + 1, "DEL")
]
candidates += [
(
scores[predIndex + 1, j] + self.matchscore(sbase, pbase),
predIndex + 1,
j,
"MATCH",
)
]
(
scores[i + 1, j + 1],
backGrphIdx[i + 1, j + 1],
backStrIdx[i + 1, j + 1],
movetype,
) = max(candidates)
if not self.globalAlign and scores[i + 1, j + 1] < 0:
scores[i + 1, j + 1] = 0.0
backGrphIdx[i + 1, j + 1] = -1
backStrIdx[i + 1, j + 1] = -1
return self.backtrack(scores, backStrIdx, backGrphIdx, nodeIndexToID)
def alignStringToGraphFast(self):
"""Align string to graph - using numpy to vectorize across the string
at each iteration."""
if type(self.sequence) is not str:
raise TypeError("Invalid Type")
l2 = len(self.sequence)
seqvec = numpy.array(list(self.sequence))
nodeIDtoIndex, nodeIndexToID, scores, backStrIdx, backGrphIdx = (
self.initializeDynamicProgrammingData()
)
inserted = numpy.zeros((l2), dtype=bool)
# having the inner loop as a function improves performance
# can use Cython, etc on this for significant further improvements
# can't vectorize this since there's a loop-carried dependency
# along the string
def insertions(i, l2, scores, inserted):
inserted[:] = False
for j in range(l2):
insscore = scores[i + 1, j] + self._gap
if insscore >= scores[i + 1, j + 1]:
scores[i + 1, j + 1] = insscore
inserted[j] = True
# Dynamic Programming
ni = self.graph.nodeiterator()
for i, node in enumerate(ni()):
gbase = node.text
predecessors = self.prevIndices(node, nodeIDtoIndex)
# calculate all best deletions, matches in one go over all
# predecessors.
# First calculate for the first predecessor, over all string posns:
deletescore = scores[predecessors[0] + 1, 1:] + self._gap
bestdelete = numpy.zeros((l2), dtype=numpy.int32) + predecessors[0] + 1
matchpoints = self.matchscoreVec(gbase, seqvec)
matchscore = scores[predecessors[0] + 1, 0:-1] + matchpoints
bestmatch = numpy.zeros((l2), dtype=numpy.int32) + predecessors[0] + 1
# then, the remaining
for predecessor in predecessors[1:]:
newdeletescore = scores[predecessor + 1, 1:] + self._gap
bestdelete = numpy.where(newdeletescore > deletescore, predecessor + 1, bestdelete)
deletescore = numpy.maximum(newdeletescore, deletescore)
gbase = self.graph.nodeIdxToBase(predecessor)
matchpoints = self.matchscoreVec(gbase, seqvec)
newmatchscore = scores[predecessor + 1, 0:-1] + matchpoints
bestmatch = numpy.where(newmatchscore > matchscore, predecessor + 1, bestmatch)
matchscore = numpy.maximum(newmatchscore, matchscore)
# choose best options available of match, delete
deleted = deletescore >= matchscore
backGrphIdx[i + 1, 1:] = numpy.where(deleted, bestdelete, bestmatch)
backStrIdx[i + 1, 1:] = numpy.where(
deleted, numpy.arange(1, l2 + 1), numpy.arange(0, l2)
)
scores[i + 1, 1:] = numpy.where(deleted, deletescore, matchscore)
# insertions: updated in place, don't depend on predecessors
insertions(i, l2, scores, inserted)
backGrphIdx[i + 1, 1:] = numpy.where(inserted, i + 1, backGrphIdx[i + 1, 1:])
backStrIdx[i + 1, 1:] = numpy.where(inserted, numpy.arange(l2), backStrIdx[i + 1, 1:])
# if we're doing local alignment, don't let bad global alignment
# drag us negative
if not self.globalAlign:
backGrphIdx[i + 1, :] = numpy.where(scores[i + 1, :] > 0, backGrphIdx[i + 1, :], -1)
backStrIdx[i + 1, :] = numpy.where(scores[i + 1, :] > 0, backStrIdx[i + 1, :], -1)
scores[i + 1, :] = numpy.maximum(scores[i + 1, :], 0)
return self.backtrack(scores, backStrIdx, backGrphIdx, nodeIndexToID)
def prevIndices(self, node, nodeIDtoIndex):
"""Return a list of the previous dynamic programming table indices
corresponding to predecessors of the current node."""
prev = [nodeIDtoIndex[predID] for predID in list(node.inEdges.keys())]
# if no predecessors, point to just before the graph
if not prev:
prev = [-1]
return prev
def initializeDynamicProgrammingData(self):
"""Initalize the dynamic programming tables:
- set up scores array
- set up backtracking array
- create index to Node ID table and vice versa"""
l1 = self.graph.nNodes
l2 = len(self.sequence)
nodeIDtoIndex = {}
nodeIndexToID = {-1: None}
# generate a dict of (nodeID) -> (index into nodelist (and thus matrix))
ni = self.graph.nodeiterator()
for index, node in enumerate(ni()):
nodeIDtoIndex[node.ID] = index
nodeIndexToID[index] = node.ID
# Dynamic Programming data structures; scores matrix and backtracking
# matrix
scores = numpy.zeros((l1 + 1, l2 + 1), dtype=numpy.int32)
# initialize insertion score
# if global align, penalty for starting at head != 0
if self.globalAlign:
scores[0, :] = numpy.arange(l2 + 1) * self._gap
ni = self.graph.nodeiterator()
for index, node in enumerate(ni()):
prevIdxs = self.prevIndices(node, nodeIDtoIndex)
best = scores[prevIdxs[0] + 1, 0]
for prevIdx in prevIdxs:
best = max(best, scores[prevIdx + 1, 0])
scores[index + 1, 0] = best + self._gap
# backtracking matrices
backStrIdx = numpy.zeros((l1 + 1, l2 + 1), dtype=numpy.int32)
backGrphIdx = numpy.zeros((l1 + 1, l2 + 1), dtype=numpy.int32)
return nodeIDtoIndex, nodeIndexToID, scores, backStrIdx, backGrphIdx
def backtrack(self, scores, backStrIdx, backGrphIdx, nodeIndexToID):
"""Backtrack through the scores and backtrack arrays.
Return a list of sequence indices and node IDs (not indices, which
depend on ordering)."""
besti, bestj = scores.shape
besti -= 1
bestj -= 1
if not self.globalAlign:
besti, bestj = numpy.argwhere(scores == numpy.amax(scores))[-1]
else:
ni = self.graph.nodeiterator()
# still have to find best final index to start from
terminalIndices = [index for (index, node) in enumerate(ni()) if node.outDegree == 0]
print(terminalIndices)
besti = terminalIndices[0] + 1
bestscore = scores[besti, bestj]
for i in terminalIndices[1:]:
score = scores[i + 1, bestj]
if score > bestscore:
bestscore, besti = score, i + 1
matches = []
strindexes = []
while (self.globalAlign or scores[besti, bestj] > 0) and (besti != 0 or bestj != 0):
nexti, nextj = backGrphIdx[besti, bestj], backStrIdx[besti, bestj]
curstridx, curnodeidx = bestj - 1, nodeIndexToID[besti - 1]
strindexes.insert(0, curstridx if nextj != bestj else None)
matches.insert(0, curnodeidx if nexti != besti else None)
besti, bestj = nexti, nextj
return strindexes, matches