Spaces:
Build error
Build error
| """ | |
| Adapted from Jonathan Dursi | |
| https://github.com/ljdursi/poapy | |
| """ | |
| import numpy | |
| class SeqGraphAlignment(object): | |
| __matchscore = 1 | |
| __mismatchscore = -2 | |
| __gap = -1 | |
| def __init__( | |
| self, | |
| sequence, | |
| graph, | |
| fastMethod=True, | |
| globalAlign=False, | |
| matchscore=__matchscore, | |
| mismatchscore=__mismatchscore, | |
| gapscore=__gap, | |
| *args, | |
| **kwargs, | |
| ): | |
| self._mismatchscore = mismatchscore | |
| self._matchscore = matchscore | |
| self._gap = gapscore | |
| self.sequence = sequence | |
| self.graph = graph | |
| self.stringidxs = None | |
| self.nodeidxs = None | |
| self.globalAlign = globalAlign | |
| if fastMethod: | |
| matches = self.alignStringToGraphFast(*args, **kwargs) | |
| else: | |
| matches = self.alignStringToGraphSimple(*args, **kwargs) | |
| self.stringidxs, self.nodeidxs = matches | |
| def alignmentStrings(self): | |
| return "".join( | |
| self.sequence[i] if i is not None else "-" for i in self.stringidxs | |
| ), "".join(self.graph.nodedict[j].text if j is not None else "-" for j in self.nodeidxs) | |
| def matchscore(self, c1, c2): | |
| if c1 == c2: | |
| return self._matchscore | |
| else: | |
| return self._mismatchscore | |
| def matchscoreVec(self, c, v): | |
| return numpy.where(v == c, self._matchscore, self._mismatchscore) | |
| def alignStringToGraphSimple(self): | |
| """Align string to graph, following same approach as smith waterman | |
| example""" | |
| if type(self.sequence) is not str: | |
| raise TypeError("Invalid Type") | |
| nodeIDtoIndex, nodeIndexToID, scores, backStrIdx, backGrphIdx = ( | |
| self.initializeDynamicProgrammingData() | |
| ) | |
| # Dynamic Programming | |
| ni = self.graph.nodeiterator() | |
| for i, node in enumerate(ni()): | |
| pbase = node.text | |
| for j, sbase in enumerate(self.sequence): | |
| # add all candidates to a list, pick the best | |
| candidates = [(scores[i + 1, j] + self._gap, i + 1, j, "INS")] | |
| for predIndex in self.prevIndices(node, nodeIDtoIndex): | |
| candidates += [ | |
| (scores[predIndex + 1, j + 1] + self._gap, predIndex + 1, j + 1, "DEL") | |
| ] | |
| candidates += [ | |
| ( | |
| scores[predIndex + 1, j] + self.matchscore(sbase, pbase), | |
| predIndex + 1, | |
| j, | |
| "MATCH", | |
| ) | |
| ] | |
| ( | |
| scores[i + 1, j + 1], | |
| backGrphIdx[i + 1, j + 1], | |
| backStrIdx[i + 1, j + 1], | |
| movetype, | |
| ) = max(candidates) | |
| if not self.globalAlign and scores[i + 1, j + 1] < 0: | |
| scores[i + 1, j + 1] = 0.0 | |
| backGrphIdx[i + 1, j + 1] = -1 | |
| backStrIdx[i + 1, j + 1] = -1 | |
| return self.backtrack(scores, backStrIdx, backGrphIdx, nodeIndexToID) | |
| def alignStringToGraphFast(self): | |
| """Align string to graph - using numpy to vectorize across the string | |
| at each iteration.""" | |
| if type(self.sequence) is not str: | |
| raise TypeError("Invalid Type") | |
| l2 = len(self.sequence) | |
| seqvec = numpy.array(list(self.sequence)) | |
| nodeIDtoIndex, nodeIndexToID, scores, backStrIdx, backGrphIdx = ( | |
| self.initializeDynamicProgrammingData() | |
| ) | |
| inserted = numpy.zeros((l2), dtype=bool) | |
| # having the inner loop as a function improves performance | |
| # can use Cython, etc on this for significant further improvements | |
| # can't vectorize this since there's a loop-carried dependency | |
| # along the string | |
| def insertions(i, l2, scores, inserted): | |
| inserted[:] = False | |
| for j in range(l2): | |
| insscore = scores[i + 1, j] + self._gap | |
| if insscore >= scores[i + 1, j + 1]: | |
| scores[i + 1, j + 1] = insscore | |
| inserted[j] = True | |
| # Dynamic Programming | |
| ni = self.graph.nodeiterator() | |
| for i, node in enumerate(ni()): | |
| gbase = node.text | |
| predecessors = self.prevIndices(node, nodeIDtoIndex) | |
| # calculate all best deletions, matches in one go over all | |
| # predecessors. | |
| # First calculate for the first predecessor, over all string posns: | |
| deletescore = scores[predecessors[0] + 1, 1:] + self._gap | |
| bestdelete = numpy.zeros((l2), dtype=numpy.int32) + predecessors[0] + 1 | |
| matchpoints = self.matchscoreVec(gbase, seqvec) | |
| matchscore = scores[predecessors[0] + 1, 0:-1] + matchpoints | |
| bestmatch = numpy.zeros((l2), dtype=numpy.int32) + predecessors[0] + 1 | |
| # then, the remaining | |
| for predecessor in predecessors[1:]: | |
| newdeletescore = scores[predecessor + 1, 1:] + self._gap | |
| bestdelete = numpy.where(newdeletescore > deletescore, predecessor + 1, bestdelete) | |
| deletescore = numpy.maximum(newdeletescore, deletescore) | |
| gbase = self.graph.nodeIdxToBase(predecessor) | |
| matchpoints = self.matchscoreVec(gbase, seqvec) | |
| newmatchscore = scores[predecessor + 1, 0:-1] + matchpoints | |
| bestmatch = numpy.where(newmatchscore > matchscore, predecessor + 1, bestmatch) | |
| matchscore = numpy.maximum(newmatchscore, matchscore) | |
| # choose best options available of match, delete | |
| deleted = deletescore >= matchscore | |
| backGrphIdx[i + 1, 1:] = numpy.where(deleted, bestdelete, bestmatch) | |
| backStrIdx[i + 1, 1:] = numpy.where( | |
| deleted, numpy.arange(1, l2 + 1), numpy.arange(0, l2) | |
| ) | |
| scores[i + 1, 1:] = numpy.where(deleted, deletescore, matchscore) | |
| # insertions: updated in place, don't depend on predecessors | |
| insertions(i, l2, scores, inserted) | |
| backGrphIdx[i + 1, 1:] = numpy.where(inserted, i + 1, backGrphIdx[i + 1, 1:]) | |
| backStrIdx[i + 1, 1:] = numpy.where(inserted, numpy.arange(l2), backStrIdx[i + 1, 1:]) | |
| # if we're doing local alignment, don't let bad global alignment | |
| # drag us negative | |
| if not self.globalAlign: | |
| backGrphIdx[i + 1, :] = numpy.where(scores[i + 1, :] > 0, backGrphIdx[i + 1, :], -1) | |
| backStrIdx[i + 1, :] = numpy.where(scores[i + 1, :] > 0, backStrIdx[i + 1, :], -1) | |
| scores[i + 1, :] = numpy.maximum(scores[i + 1, :], 0) | |
| return self.backtrack(scores, backStrIdx, backGrphIdx, nodeIndexToID) | |
| def prevIndices(self, node, nodeIDtoIndex): | |
| """Return a list of the previous dynamic programming table indices | |
| corresponding to predecessors of the current node.""" | |
| prev = [nodeIDtoIndex[predID] for predID in list(node.inEdges.keys())] | |
| # if no predecessors, point to just before the graph | |
| if not prev: | |
| prev = [-1] | |
| return prev | |
| def initializeDynamicProgrammingData(self): | |
| """Initalize the dynamic programming tables: | |
| - set up scores array | |
| - set up backtracking array | |
| - create index to Node ID table and vice versa""" | |
| l1 = self.graph.nNodes | |
| l2 = len(self.sequence) | |
| nodeIDtoIndex = {} | |
| nodeIndexToID = {-1: None} | |
| # generate a dict of (nodeID) -> (index into nodelist (and thus matrix)) | |
| ni = self.graph.nodeiterator() | |
| for index, node in enumerate(ni()): | |
| nodeIDtoIndex[node.ID] = index | |
| nodeIndexToID[index] = node.ID | |
| # Dynamic Programming data structures; scores matrix and backtracking | |
| # matrix | |
| scores = numpy.zeros((l1 + 1, l2 + 1), dtype=numpy.int32) | |
| # initialize insertion score | |
| # if global align, penalty for starting at head != 0 | |
| if self.globalAlign: | |
| scores[0, :] = numpy.arange(l2 + 1) * self._gap | |
| ni = self.graph.nodeiterator() | |
| for index, node in enumerate(ni()): | |
| prevIdxs = self.prevIndices(node, nodeIDtoIndex) | |
| best = scores[prevIdxs[0] + 1, 0] | |
| for prevIdx in prevIdxs: | |
| best = max(best, scores[prevIdx + 1, 0]) | |
| scores[index + 1, 0] = best + self._gap | |
| # backtracking matrices | |
| backStrIdx = numpy.zeros((l1 + 1, l2 + 1), dtype=numpy.int32) | |
| backGrphIdx = numpy.zeros((l1 + 1, l2 + 1), dtype=numpy.int32) | |
| return nodeIDtoIndex, nodeIndexToID, scores, backStrIdx, backGrphIdx | |
| def backtrack(self, scores, backStrIdx, backGrphIdx, nodeIndexToID): | |
| """Backtrack through the scores and backtrack arrays. | |
| Return a list of sequence indices and node IDs (not indices, which | |
| depend on ordering).""" | |
| besti, bestj = scores.shape | |
| besti -= 1 | |
| bestj -= 1 | |
| if not self.globalAlign: | |
| besti, bestj = numpy.argwhere(scores == numpy.amax(scores))[-1] | |
| else: | |
| ni = self.graph.nodeiterator() | |
| # still have to find best final index to start from | |
| terminalIndices = [index for (index, node) in enumerate(ni()) if node.outDegree == 0] | |
| print(terminalIndices) | |
| besti = terminalIndices[0] + 1 | |
| bestscore = scores[besti, bestj] | |
| for i in terminalIndices[1:]: | |
| score = scores[i + 1, bestj] | |
| if score > bestscore: | |
| bestscore, besti = score, i + 1 | |
| matches = [] | |
| strindexes = [] | |
| while (self.globalAlign or scores[besti, bestj] > 0) and (besti != 0 or bestj != 0): | |
| nexti, nextj = backGrphIdx[besti, bestj], backStrIdx[besti, bestj] | |
| curstridx, curnodeidx = bestj - 1, nodeIndexToID[besti - 1] | |
| strindexes.insert(0, curstridx if nextj != bestj else None) | |
| matches.insert(0, curnodeidx if nexti != besti else None) | |
| besti, bestj = nexti, nextj | |
| return strindexes, matches | |