Муравьёв Георгий¶

In [122]:
import numpy as np
import matplotlib.pyplot as plt
In [416]:
class SmithWaterman:

  def __init__(self, seqs, alphabet="atgc", match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=1, del_ext=1):
    self.alphabet = {letter : n for n, letter in enumerate(alphabet)}
    self.match_matrix = match_matrix
    self.del_open = del_open
    self.del_ext = del_ext
    self.seqs = seqs
    self.N = len(self.seqs)
    self.alignments = np.full((self.N, self.N), "", dtype=object)
    self.aligners = np.full((self.N, self.N), "", dtype=object)
    self.scores = np.zeros((self.N, self.N))
    self.X = np.empty((self.N, self.N), dtype=object)
    self.Y = np.empty((self.N, self.N), dtype=object)
    self.M = np.empty((self.N, self.N), dtype=object)

  def align(self):
    if self.N > 1:
      for i in range(self.N):
        for j in range(i, self.N):
          self.align_two(i, j)
    else:
      print("At least two sequences are expected")

  def align_two(self, first, second):
    seq1, seq2 = self.seqs[first], self.seqs[second]
    n, m = len(seq1), len(seq2)
    M = np.zeros((n, m))
    pi_M = np.zeros((n, m))
    X = np.zeros((n, m))
    pi_X = np.zeros((n, m))
    Y = np.zeros((n, m))
    pi_Y = np.zeros((n, m))

    for i in range(n):
      for j in range(m):
        M[i, j] = max(M[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
                      X[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
                      Y[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
                      0)
        pi_M[i, j] = np.argmax([M[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
                             X[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
                             Y[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
                             0])
        X[i, j] = max(M[i-1, j] - self.del_open,
                      X[i-1, j] - self.del_ext,
                      -np.inf) #because Y --> X is not allowed
        pi_X[i, j] = np.argmax([M[i-1, j] - self.del_open,
                                X[i-1, j] - self.del_ext,
                                -np.inf]) #because Y --> X is not allowed
        Y[i, j] = max(M[i, j-1] - self.del_open,
                      -np.inf, #because X --> Y is not allowed
                      Y[i, j-1] - self.del_ext)
        pi_Y[i, j] = np.argmax([M[i, j-1] - self.del_open,
                                -np.inf, #because X --> Y is not allowed
                                Y[i, j-1] - self.del_ext])

    i, j = np.unravel_index(np.argmax(M), M.shape)
    score = M[i, j]
    states = [0]
    while i > 0 and j > 0 and score > 0:
      state = states[-1]
      if state == 0:
        pre_state = pi_M[i, j]
        i -= 1
        j -= 1
      if state == 1:
        pre_state = pi_X[i, j]
        i -= 1
      if state == 2:
        pre_state = pi_Y[i, j]
        j -= 1
      if state == 3:
        break
      states.append(pre_state)
    states.reverse()
    seq1_ = list(seq1[i:])[::-1]
    seq2_ = list(seq2[j:])[::-1]
    aligned_seq1 = []
    aligned_seq2 = []
    aligner = []
    #print(states)
    for state in states:
      if state == 0:
        aligned_seq1.append(seq1_.pop())
        aligned_seq2.append(seq2_.pop())
        if aligned_seq1[-1] == aligned_seq2[-1]:
          aligner.append("|")
        else:
          aligner.append(".")
      if state == 1:
        aligned_seq1.append(seq1_.pop())
        aligned_seq2.append("-")
        aligner.append(" ")
      if state == 2:
        aligned_seq1.append("-")
        aligned_seq2.append(seq2_.pop())
        aligner.append(" ")

    self.M[first, second] = M
    self.X[first, second] = X
    self.Y[first, second] = Y
    self.alignments[first, second] = "".join(aligned_seq1)
    self.alignments[second, first] = "".join(aligned_seq2)
    self.aligners[first, second] = "".join(aligner)
    self.scores[first, second] = score

  def set_HMM_params(self, eta=0.5, delta=0.25, epsilon=0.5, tau=0.25, p_letter=None, p_match=None):
    self.eta = eta
    self.delta = delta
    self.epsilon = epsilon
    self.p_letter = p_letter
    self.p_match = p_match

    if p_letter is None:
      self.p_letter = np.ones(len(self.alphabet)).reshape(1,-1)/len(self.alphabet)
    if p_match is None:
      p_match = np.exp(self.match_matrix) * self.p_letter * self.p_letter.T
      self.p_match = p_match / np.sum(p_match)
    if delta is None:
      self.delta = np.exp(-self.del_open)
    if epsilon is None:
      self.epsilon = np.exp(-self.del_ext)

    self.tau = min(tau, 1-2*self.delta, 1-self.epsilon)

  def fb_two(self, first, second, eps=1e-6):

    self.set_HMM_params()
    seq1, seq2 = self.seqs[first], self.seqs[second]
    n, m = len(seq1), len(seq2)
    f_B = np.zeros((n+1, m+1))
    f_X1 = np.zeros((n, m))
    f_Y1 = np.zeros((n, m))
    f_M = np.zeros((n, m))
    f_X = np.zeros((n, m))
    f_Y = np.zeros((n, m))
    f_X2 = np.zeros((n, m))
    f_Y2 = np.zeros((n, m))
    f_E = np.zeros((n+1, m+1))

    f_B[0,0] = 1

    for i in range(n):
      for j in range(m):
        f_X1[i,j] = np.sum([self.p_letter[0,self.alphabet[seq1[0]]] * (1-self.eta) * f_B[i,j],
                            self.p_letter[0,self.alphabet[seq1[i]]] * (1-self.eta) * f_X1[i-1,j]])
        f_Y1[i,j] = np.sum([self.p_letter[0,self.alphabet[seq2[0]]] * self.eta * (1-self.eta) * f_B[i,j],
                            self.p_letter[0,self.alphabet[seq2[j]]] * (1-self.eta) * self.eta * f_X1[i,j-1],
                            self.p_letter[0,self.alphabet[seq2[j]]] * (1-self.eta) * f_Y1[i,j-1]])
        f_M[i,j] = np.sum([self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * self.eta * self.eta * (1-2*self.delta-self.tau) * f_B[i,j],
                           self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * self.eta * self.eta * (1-2*self.delta-self.tau) * f_X1[i-1,j-1],
                           self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * self.eta * (1-2*self.delta-self.tau) * f_Y1[i-1,j-1],
                           self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * (1-2*self.delta-self.tau) * f_M[i-1,j-1],
                           self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * (1-self.epsilon-self.tau) * f_X[i-1,j-1],
                           self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * (1-self.epsilon-self.tau) * f_Y[i-1,j-1]])
        f_X[i,j] = np.sum([self.p_letter[0,self.alphabet[seq1[0]]] * self.eta * self.eta * self.delta * f_B[i,j],
                           self.p_letter[0,self.alphabet[seq1[i]]] * self.eta * self.eta * self.delta * f_X1[i-1,j],
                           self.p_letter[0,self.alphabet[seq1[i]]] * self.eta * self.delta * f_Y1[i-1,j],
                           self.p_letter[0,self.alphabet[seq1[i]]] * self.delta * f_M[i-1,j],
                           self.p_letter[0,self.alphabet[seq1[i]]] * self.epsilon * f_X[i-1,j]])
        f_Y[i,j] = np.sum([self.p_letter[0,self.alphabet[seq2[0]]] * self.eta * self.eta * self.delta * f_B[i,j],
                           self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * self.eta * self.delta * f_X1[i,j-1],
                           self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * self.delta * f_Y1[i,j-1],
                           self.p_letter[0,self.alphabet[seq2[j]]] * self.delta * f_M[i,j-1],
                           self.p_letter[0,self.alphabet[seq2[j]]] * self.epsilon * f_Y[i,j-1]])
        f_X2[i,j] = np.sum([self.p_letter[0,self.alphabet[seq1[0]]] * self.eta * self.eta * self.tau * (1-self.eta) * f_B[i,j],
                            self.p_letter[0,self.alphabet[seq1[i]]] * self.eta * self.eta * self.tau * (1-self.eta) * f_X1[i-1,j],
                            self.p_letter[0,self.alphabet[seq1[i]]] * self.eta * self.tau * (1-self.eta) * f_Y1[i-1,j],
                            self.p_letter[0,self.alphabet[seq1[i]]] * self.tau * (1-self.eta) * f_M[i-1,j],
                            self.p_letter[0,self.alphabet[seq1[i]]] * self.tau * (1-self.eta) * f_X[i-1,j],
                            self.p_letter[0,self.alphabet[seq1[i]]] * self.tau * (1-self.eta) * f_Y[i-1,j],
                            self.p_letter[0,self.alphabet[seq1[i]]] * (1-self.eta) * f_X2[i-1, j]])
        f_Y2[i,j] = np.sum([self.p_letter[0,self.alphabet[seq2[0]]] * self.eta * self.eta * self.tau * self.eta * (1-self.eta) * f_B[i,j],
                            self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * self.eta * self.tau * self.eta * (1-self.eta) * f_X1[i,j-1],
                            self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * self.tau * self.eta * (1-self.eta) * f_Y1[i,j-1],
                            self.p_letter[0,self.alphabet[seq2[j]]] * self.tau * self.eta * (1-self.eta) * f_M[i,j-1],
                            self.p_letter[0,self.alphabet[seq2[j]]] * self.tau * self.eta * (1-self.eta) * f_X[i,j-1],
                            self.p_letter[0,self.alphabet[seq2[j]]] * self.tau * self.eta * (1-self.eta) * f_Y[i,j-1],
                            self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * (1-self.eta) * f_X2[i, j-1],
                            self.p_letter[0,self.alphabet[seq2[j]]] * (1-self.eta) * f_Y2[i, j-1]])

    f_E[n, m] = np.sum([#self.eta * self.eta * self.tau * self.eta * self.eta * f_B[0,0],
                        self.eta * self.eta * self.tau * self.eta * self.eta * f_X1[n-1,m-1],
                        self.eta * self.tau * self.eta * self.eta * f_Y1[n-1,m-1],
                        self.tau * self.eta * self.eta * f_M[n-1,m-1],
                        self.tau * self.eta * self.eta * f_X[n-1,m-1],
                        self.tau * self.eta * self.eta * f_Y[n-1,m-1],
                        self.eta * self.eta * f_X2[n-1,m-1],
                        self.eta * f_Y2[n-1,m-1]])

    b_E = np.zeros((n+1,m+1))
    b_X1 = np.zeros((n, m))
    b_Y1 = np.zeros((n, m))
    b_M = np.zeros((n, m))
    b_X = np.zeros((n, m))
    b_Y = np.zeros((n, m))
    b_X2 = np.zeros((n, m))
    b_Y2 = np.zeros((n, m))
    b_B = np.zeros((n+1,m+1))

    b_E[n,m] = 1
    b_Y2[n-1,m-1] = self.eta * b_E[n,m]
    b_X2[n-1,m-1] = self.eta * self.eta * b_E[n,m]
    b_Y[n-1,m-1] = self.eta * self.eta * self.tau * b_E[n,m]
    b_X[n-1,m-1] = self.eta * self.eta * self.tau * b_E[n,m]
    b_M[n-1,m-1] = self.eta * self.eta * self.tau * b_E[n,m]
    b_Y1[n-1,m-1] = self.eta * self.eta * self.tau * self.eta * b_E[n,m]
    b_X1[n-1,m-1] = self.eta * self.eta * self.tau * self.eta * self.eta * b_E[n,m]

    for i in range(-1, -1-n, -1):
      for j in range(-1, -1-m, -1):
        if i == -1 and j == -1:
          continue
        b_Y2[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.p_letter[0,self.alphabet[seq2[j+1]]]])
        b_X2[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
                            b_X2[i+1,j] * (1-self.eta) * self.p_letter[0,self.alphabet[seq1[i+1]]]])
        b_Y[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.p_letter[0,self.alphabet[seq2[j+1]]],
                           b_X2[i+1,j] * (1-self.eta) * self.tau * self.p_letter[0,self.alphabet[seq1[i+1]]],
                           b_Y[i,j+1] * self.epsilon * self.p_letter[0,self.alphabet[seq2[j+1]]]])
        b_X[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.p_letter[0,self.alphabet[seq2[j+1]]],
                           b_X2[i+1,j] * (1-self.eta) * self.tau * self.p_letter[0,self.alphabet[seq1[i+1]]],
                           b_X[i+1,j] * self.epsilon * self.p_letter[0,self.alphabet[seq1[i+1]]]])
        b_M[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.p_letter[0,self.alphabet[seq2[j+1]]],
                           b_X2[i+1,j] * (1-self.eta) * self.tau * self.p_letter[0,self.alphabet[seq1[i+1]]],
                           b_Y[i,j+1] * self.delta * self.p_letter[0,self.alphabet[seq2[j+1]]],
                           b_X[i+1,j] * self.delta * self.p_letter[0,self.alphabet[seq1[i+1]]],
                           b_M[i+1,j+1] * (1-2*self.delta-self.tau) * self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]]])
        b_Y1[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
                            b_X2[i+1,j] * (1-self.eta) * self.tau * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
                            b_Y[i,j+1] * self.delta * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
                            b_X[i+1,j] * self.delta * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
                            b_M[i+1,j+1] * (1-2*self.delta-self.tau) * self.eta * self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
                            b_Y1[i,j+1] * (1-self.eta) * self.p_letter[0,self.alphabet[seq1[i+1]]]])
        b_X1[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.eta * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
                            b_X2[i+1,j] * (1-self.eta) * self.tau * self.eta * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
                            b_Y[i,j+1] * self.delta * self.eta * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
                            b_X[i+1,j] * self.delta * self.eta * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
                            b_M[i+1,j+1] * (1-2*self.delta-self.tau) * self.eta * self.eta * self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
                            b_Y1[i,j+1] * (1-self.eta) * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
                            b_X1[i+1,j] * (1-self.eta) * self.p_letter[0,self.alphabet[seq1[i+1]]]])
    b_B[0,0] = np.sum([#b_E[n,m] * self.eta * self.eta * self.tau * self.eta * self.eta,
                       b_Y2[0,0] * (1-self.eta) * self.eta * self.tau * self.eta * self.eta * self.p_letter[0,self.alphabet[seq2[0]]],
                       b_X2[0,0] *(1-self.eta) * self.tau * self.eta * self.eta * self.p_letter[0,self.alphabet[seq1[0]]],
                       b_Y[0,0] * self.delta * self.eta * self.eta * self.p_letter[0,self.alphabet[seq2[0]]],
                       b_X[0,0] * self.delta * self.eta * self.eta * self.p_letter[0,self.alphabet[seq1[0]]],
                       b_M[0,0] * (1-2*self.delta-self.tau) * self.eta * self.eta * self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
                       b_Y1[0,0] * (1-self.eta) * self.eta * self.p_letter[0,self.alphabet[seq1[0]]],
                       b_X1[0,0] * (1-self.eta) * self.p_letter[0,self.alphabet[seq1[0]]]])

    #cheking of computed values
    assert f_E[n,m] - b_B[0,0] < eps, "Something is wrong and f_E(n,m) != b_B(0,0)"


    self.f_M = f_M
    self.b_M = b_M
    self.f_X = f_X
    self.b_X = b_X
    self.f_Y = f_Y
    self.b_Y = b_Y
    self.f_E = f_E
    self.f_X1 = f_X1
    self.b_X1 = b_X1
    self.f_X2 = f_X2
    self.b_X2 = b_X2
    self.f_Y1 = f_Y1
    self.b_Y1 = b_Y1
    self.f_Y2 = f_Y2
    self.b_Y2= b_Y2

    self.p_M = (f_M * b_M) / f_E[n,m]
    self.p_X = (f_X * b_X) / f_E[n,m]
    self.p_Y = (f_Y * b_Y) / f_E[n,m]


  def print_alignment(self, i, j):
    seq1 = self.alignments[i, j]
    seq2 = self.alignments[j, i]
    aligner = self.aligners[i, j]
    score = self.scores[i, j]
    print(f"score = {score}\n{seq1}\n{aligner}\n{seq2}")

  def print_all_alignments(self):
    N = len(self.seqs)
    if N > 1:
      for i in range(N):
        for j in range(i, N):
          print(f"Alignment {i} with {j}")
          self.print_alignment(i, j)
          print()
    else:
      print("At least two sequences are expected")

Тестовые последовательности¶

In [434]:
seqs = ["acttgcgcctcgc",
        "acgggcgcctcgc", #mismatchs
        "actcgcctcgc", #deletion
        "acttgagtcgcctcgc"] #insertion

Параметры: match=1, mismatch=1, del=1¶

In [435]:
SW = SmithWaterman(seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=1, del_ext=1)
SW.align()
SW.print_all_alignments()
Alignment 0 with 0
score = 13.0
acttgcgcctcgc
|||||||||||||
acttgcgcctcgc

Alignment 0 with 1
score = 9.0
acttgcgcctcgc
||..|||||||||
acgggcgcctcgc

Alignment 0 with 2
score = 9.0
acttgcgcctcgc
|| | ||||||||
ac-t-cgcctcgc

Alignment 0 with 3
score = 10.0
actt--g-cgcctcgc
||||  | ||||||||
acttgagtcgcctcgc

Alignment 1 with 1
score = 13.0
acgggcgcctcgc
|||||||||||||
acgggcgcctcgc

Alignment 1 with 2
score = 8.0
ggcgcctcg
..|||||||
ctcgcctcg

Alignment 1 with 3
score = 8.0
cgggcgcctcg
..|.|||||||
gagtcgcctcg

Alignment 2 with 2
score = 11.0
actcgcctcgc
|||||||||||
actcgcctcgc

Alignment 2 with 3
score = 9.0
actcgcctcgc
|.|||||||||
agtcgcctcgc

Alignment 3 with 3
score = 16.0
acttgagtcgcctcgc
||||||||||||||||
acttgagtcgcctcgc

Выравнивания последовательностей соответсвуют ожидаемым

Параметры: match=1, mismatch=1, del_open=5, del_ext=1¶

In [436]:
SW = SmithWaterman(seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=5, del_ext=1)
SW.align()
SW.print_all_alignments()
Alignment 0 with 0
score = 13.0
acttgcgcctcgc
|||||||||||||
acttgcgcctcgc

Alignment 0 with 1
score = 9.0
acttgcgcctcgc
||..|||||||||
acgggcgcctcgc

Alignment 0 with 2
score = 8.0
tgcgcctcg
..|||||||
ctcgcctcg

Alignment 0 with 3
score = 8.0
tgcgcctcg
..|||||||
gtcgcctcg

Alignment 1 with 1
score = 13.0
acgggcgcctcgc
|||||||||||||
acgggcgcctcgc

Alignment 1 with 2
score = 8.0
ggcgcctcg
..|||||||
ctcgcctcg

Alignment 1 with 3
score = 8.0
cgggcgcctcg
..|.|||||||
gagtcgcctcg

Alignment 2 with 2
score = 11.0
actcgcctcgc
|||||||||||
actcgcctcgc

Alignment 2 with 3
score = 9.0
actcgcctcgc
|.|||||||||
agtcgcctcgc

Alignment 3 with 3
score = 16.0
acttgagtcgcctcgc
||||||||||||||||
acttgagtcgcctcgc

Из-за очень высокого штрафа за открытие делеции оптимальными оказываются короткие выравнивания с мисмэтчами, а не длинные с делециями.

Распределение весов выравниваний¶

In [450]:
alphabet = "atgc"
seqs = []
np.random.seed(42)
for i in range(20):
  seqs.append("".join(np.random.choice(list(alphabet), 200)))
SW = SmithWaterman(seqs=seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=1, del_ext=1)
SW.align()
plt.hist(SW.scores[np.triu_indices(20,1)])
Out[450]:
(array([ 8., 18., 17., 32., 29., 58., 15.,  8.,  2.,  3.]),
 array([20. , 22.2, 24.4, 26.6, 28.8, 31. , 33.2, 35.4, 37.6, 39.8, 42. ]),
 <BarContainer object of 10 artists>)
In [451]:
SW = SmithWaterman(seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=5, del_ext=1)
SW.align()
plt.hist(SW.scores[np.triu_indices(20,1)])
Out[451]:
(array([ 1., 40., 51., 56.,  0., 31.,  8.,  2.,  0.,  1.]),
 array([ 6. ,  6.8,  7.6,  8.4,  9.2, 10. , 10.8, 11.6, 12.4, 13.2, 14. ]),
 <BarContainer object of 10 artists>)

В случае афинных штрафов выравнивания случайных последовательностей имеют меньший вес

Forward-backward algorythm¶

In [455]:
seqs = ["aactctcgtacgtacgtgcgggctacg"]
SW = SmithWaterman(seqs=seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=1, del_ext=1)
SW.fb_two(0,0)
In [456]:
plt.imshow(SW.p_M)
plt.colorbar()
Out[456]:
<matplotlib.colorbar.Colorbar at 0x7d311b87ed40>

По всей видимости в реализации алгоритма есть какая-то ошибка, но я не могу её найти. Как ещё объяснить почему даже выравнивание последовательности против самой себя имеет очень маленькую вероятность я не могу. Ещё я не смог разобраться откуда берутся параметры для HMM.