import numpy as np
import matplotlib.pyplot as plt
class SmithWaterman:
def __init__(self, seqs, alphabet="atgc", match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=1, del_ext=1):
self.alphabet = {letter : n for n, letter in enumerate(alphabet)}
self.match_matrix = match_matrix
self.del_open = del_open
self.del_ext = del_ext
self.seqs = seqs
self.N = len(self.seqs)
self.alignments = np.full((self.N, self.N), "", dtype=object)
self.aligners = np.full((self.N, self.N), "", dtype=object)
self.scores = np.zeros((self.N, self.N))
self.X = np.empty((self.N, self.N), dtype=object)
self.Y = np.empty((self.N, self.N), dtype=object)
self.M = np.empty((self.N, self.N), dtype=object)
def align(self):
if self.N > 1:
for i in range(self.N):
for j in range(i, self.N):
self.align_two(i, j)
else:
print("At least two sequences are expected")
def align_two(self, first, second):
seq1, seq2 = self.seqs[first], self.seqs[second]
n, m = len(seq1), len(seq2)
M = np.zeros((n, m))
pi_M = np.zeros((n, m))
X = np.zeros((n, m))
pi_X = np.zeros((n, m))
Y = np.zeros((n, m))
pi_Y = np.zeros((n, m))
for i in range(n):
for j in range(m):
M[i, j] = max(M[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
X[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
Y[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
0)
pi_M[i, j] = np.argmax([M[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
X[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
Y[i-1, j-1] + self.match_matrix[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
0])
X[i, j] = max(M[i-1, j] - self.del_open,
X[i-1, j] - self.del_ext,
-np.inf) #because Y --> X is not allowed
pi_X[i, j] = np.argmax([M[i-1, j] - self.del_open,
X[i-1, j] - self.del_ext,
-np.inf]) #because Y --> X is not allowed
Y[i, j] = max(M[i, j-1] - self.del_open,
-np.inf, #because X --> Y is not allowed
Y[i, j-1] - self.del_ext)
pi_Y[i, j] = np.argmax([M[i, j-1] - self.del_open,
-np.inf, #because X --> Y is not allowed
Y[i, j-1] - self.del_ext])
i, j = np.unravel_index(np.argmax(M), M.shape)
score = M[i, j]
states = [0]
while i > 0 and j > 0 and score > 0:
state = states[-1]
if state == 0:
pre_state = pi_M[i, j]
i -= 1
j -= 1
if state == 1:
pre_state = pi_X[i, j]
i -= 1
if state == 2:
pre_state = pi_Y[i, j]
j -= 1
if state == 3:
break
states.append(pre_state)
states.reverse()
seq1_ = list(seq1[i:])[::-1]
seq2_ = list(seq2[j:])[::-1]
aligned_seq1 = []
aligned_seq2 = []
aligner = []
#print(states)
for state in states:
if state == 0:
aligned_seq1.append(seq1_.pop())
aligned_seq2.append(seq2_.pop())
if aligned_seq1[-1] == aligned_seq2[-1]:
aligner.append("|")
else:
aligner.append(".")
if state == 1:
aligned_seq1.append(seq1_.pop())
aligned_seq2.append("-")
aligner.append(" ")
if state == 2:
aligned_seq1.append("-")
aligned_seq2.append(seq2_.pop())
aligner.append(" ")
self.M[first, second] = M
self.X[first, second] = X
self.Y[first, second] = Y
self.alignments[first, second] = "".join(aligned_seq1)
self.alignments[second, first] = "".join(aligned_seq2)
self.aligners[first, second] = "".join(aligner)
self.scores[first, second] = score
def set_HMM_params(self, eta=0.5, delta=0.25, epsilon=0.5, tau=0.25, p_letter=None, p_match=None):
self.eta = eta
self.delta = delta
self.epsilon = epsilon
self.p_letter = p_letter
self.p_match = p_match
if p_letter is None:
self.p_letter = np.ones(len(self.alphabet)).reshape(1,-1)/len(self.alphabet)
if p_match is None:
p_match = np.exp(self.match_matrix) * self.p_letter * self.p_letter.T
self.p_match = p_match / np.sum(p_match)
if delta is None:
self.delta = np.exp(-self.del_open)
if epsilon is None:
self.epsilon = np.exp(-self.del_ext)
self.tau = min(tau, 1-2*self.delta, 1-self.epsilon)
def fb_two(self, first, second, eps=1e-6):
self.set_HMM_params()
seq1, seq2 = self.seqs[first], self.seqs[second]
n, m = len(seq1), len(seq2)
f_B = np.zeros((n+1, m+1))
f_X1 = np.zeros((n, m))
f_Y1 = np.zeros((n, m))
f_M = np.zeros((n, m))
f_X = np.zeros((n, m))
f_Y = np.zeros((n, m))
f_X2 = np.zeros((n, m))
f_Y2 = np.zeros((n, m))
f_E = np.zeros((n+1, m+1))
f_B[0,0] = 1
for i in range(n):
for j in range(m):
f_X1[i,j] = np.sum([self.p_letter[0,self.alphabet[seq1[0]]] * (1-self.eta) * f_B[i,j],
self.p_letter[0,self.alphabet[seq1[i]]] * (1-self.eta) * f_X1[i-1,j]])
f_Y1[i,j] = np.sum([self.p_letter[0,self.alphabet[seq2[0]]] * self.eta * (1-self.eta) * f_B[i,j],
self.p_letter[0,self.alphabet[seq2[j]]] * (1-self.eta) * self.eta * f_X1[i,j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * (1-self.eta) * f_Y1[i,j-1]])
f_M[i,j] = np.sum([self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * self.eta * self.eta * (1-2*self.delta-self.tau) * f_B[i,j],
self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * self.eta * self.eta * (1-2*self.delta-self.tau) * f_X1[i-1,j-1],
self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * self.eta * (1-2*self.delta-self.tau) * f_Y1[i-1,j-1],
self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * (1-2*self.delta-self.tau) * f_M[i-1,j-1],
self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * (1-self.epsilon-self.tau) * f_X[i-1,j-1],
self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]] * (1-self.epsilon-self.tau) * f_Y[i-1,j-1]])
f_X[i,j] = np.sum([self.p_letter[0,self.alphabet[seq1[0]]] * self.eta * self.eta * self.delta * f_B[i,j],
self.p_letter[0,self.alphabet[seq1[i]]] * self.eta * self.eta * self.delta * f_X1[i-1,j],
self.p_letter[0,self.alphabet[seq1[i]]] * self.eta * self.delta * f_Y1[i-1,j],
self.p_letter[0,self.alphabet[seq1[i]]] * self.delta * f_M[i-1,j],
self.p_letter[0,self.alphabet[seq1[i]]] * self.epsilon * f_X[i-1,j]])
f_Y[i,j] = np.sum([self.p_letter[0,self.alphabet[seq2[0]]] * self.eta * self.eta * self.delta * f_B[i,j],
self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * self.eta * self.delta * f_X1[i,j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * self.delta * f_Y1[i,j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * self.delta * f_M[i,j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * self.epsilon * f_Y[i,j-1]])
f_X2[i,j] = np.sum([self.p_letter[0,self.alphabet[seq1[0]]] * self.eta * self.eta * self.tau * (1-self.eta) * f_B[i,j],
self.p_letter[0,self.alphabet[seq1[i]]] * self.eta * self.eta * self.tau * (1-self.eta) * f_X1[i-1,j],
self.p_letter[0,self.alphabet[seq1[i]]] * self.eta * self.tau * (1-self.eta) * f_Y1[i-1,j],
self.p_letter[0,self.alphabet[seq1[i]]] * self.tau * (1-self.eta) * f_M[i-1,j],
self.p_letter[0,self.alphabet[seq1[i]]] * self.tau * (1-self.eta) * f_X[i-1,j],
self.p_letter[0,self.alphabet[seq1[i]]] * self.tau * (1-self.eta) * f_Y[i-1,j],
self.p_letter[0,self.alphabet[seq1[i]]] * (1-self.eta) * f_X2[i-1, j]])
f_Y2[i,j] = np.sum([self.p_letter[0,self.alphabet[seq2[0]]] * self.eta * self.eta * self.tau * self.eta * (1-self.eta) * f_B[i,j],
self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * self.eta * self.tau * self.eta * (1-self.eta) * f_X1[i,j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * self.tau * self.eta * (1-self.eta) * f_Y1[i,j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * self.tau * self.eta * (1-self.eta) * f_M[i,j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * self.tau * self.eta * (1-self.eta) * f_X[i,j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * self.tau * self.eta * (1-self.eta) * f_Y[i,j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * self.eta * (1-self.eta) * f_X2[i, j-1],
self.p_letter[0,self.alphabet[seq2[j]]] * (1-self.eta) * f_Y2[i, j-1]])
f_E[n, m] = np.sum([#self.eta * self.eta * self.tau * self.eta * self.eta * f_B[0,0],
self.eta * self.eta * self.tau * self.eta * self.eta * f_X1[n-1,m-1],
self.eta * self.tau * self.eta * self.eta * f_Y1[n-1,m-1],
self.tau * self.eta * self.eta * f_M[n-1,m-1],
self.tau * self.eta * self.eta * f_X[n-1,m-1],
self.tau * self.eta * self.eta * f_Y[n-1,m-1],
self.eta * self.eta * f_X2[n-1,m-1],
self.eta * f_Y2[n-1,m-1]])
b_E = np.zeros((n+1,m+1))
b_X1 = np.zeros((n, m))
b_Y1 = np.zeros((n, m))
b_M = np.zeros((n, m))
b_X = np.zeros((n, m))
b_Y = np.zeros((n, m))
b_X2 = np.zeros((n, m))
b_Y2 = np.zeros((n, m))
b_B = np.zeros((n+1,m+1))
b_E[n,m] = 1
b_Y2[n-1,m-1] = self.eta * b_E[n,m]
b_X2[n-1,m-1] = self.eta * self.eta * b_E[n,m]
b_Y[n-1,m-1] = self.eta * self.eta * self.tau * b_E[n,m]
b_X[n-1,m-1] = self.eta * self.eta * self.tau * b_E[n,m]
b_M[n-1,m-1] = self.eta * self.eta * self.tau * b_E[n,m]
b_Y1[n-1,m-1] = self.eta * self.eta * self.tau * self.eta * b_E[n,m]
b_X1[n-1,m-1] = self.eta * self.eta * self.tau * self.eta * self.eta * b_E[n,m]
for i in range(-1, -1-n, -1):
for j in range(-1, -1-m, -1):
if i == -1 and j == -1:
continue
b_Y2[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.p_letter[0,self.alphabet[seq2[j+1]]]])
b_X2[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
b_X2[i+1,j] * (1-self.eta) * self.p_letter[0,self.alphabet[seq1[i+1]]]])
b_Y[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.p_letter[0,self.alphabet[seq2[j+1]]],
b_X2[i+1,j] * (1-self.eta) * self.tau * self.p_letter[0,self.alphabet[seq1[i+1]]],
b_Y[i,j+1] * self.epsilon * self.p_letter[0,self.alphabet[seq2[j+1]]]])
b_X[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.p_letter[0,self.alphabet[seq2[j+1]]],
b_X2[i+1,j] * (1-self.eta) * self.tau * self.p_letter[0,self.alphabet[seq1[i+1]]],
b_X[i+1,j] * self.epsilon * self.p_letter[0,self.alphabet[seq1[i+1]]]])
b_M[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.p_letter[0,self.alphabet[seq2[j+1]]],
b_X2[i+1,j] * (1-self.eta) * self.tau * self.p_letter[0,self.alphabet[seq1[i+1]]],
b_Y[i,j+1] * self.delta * self.p_letter[0,self.alphabet[seq2[j+1]]],
b_X[i+1,j] * self.delta * self.p_letter[0,self.alphabet[seq1[i+1]]],
b_M[i+1,j+1] * (1-2*self.delta-self.tau) * self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]]])
b_Y1[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
b_X2[i+1,j] * (1-self.eta) * self.tau * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
b_Y[i,j+1] * self.delta * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
b_X[i+1,j] * self.delta * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
b_M[i+1,j+1] * (1-2*self.delta-self.tau) * self.eta * self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
b_Y1[i,j+1] * (1-self.eta) * self.p_letter[0,self.alphabet[seq1[i+1]]]])
b_X1[i,j] = np.sum([b_Y2[i,j+1] * (1-self.eta) * self.eta * self.tau * self.eta * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
b_X2[i+1,j] * (1-self.eta) * self.tau * self.eta * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
b_Y[i,j+1] * self.delta * self.eta * self.eta * self.p_letter[0,self.alphabet[seq2[j+1]]],
b_X[i+1,j] * self.delta * self.eta * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
b_M[i+1,j+1] * (1-2*self.delta-self.tau) * self.eta * self.eta * self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
b_Y1[i,j+1] * (1-self.eta) * self.eta * self.p_letter[0,self.alphabet[seq1[i+1]]],
b_X1[i+1,j] * (1-self.eta) * self.p_letter[0,self.alphabet[seq1[i+1]]]])
b_B[0,0] = np.sum([#b_E[n,m] * self.eta * self.eta * self.tau * self.eta * self.eta,
b_Y2[0,0] * (1-self.eta) * self.eta * self.tau * self.eta * self.eta * self.p_letter[0,self.alphabet[seq2[0]]],
b_X2[0,0] *(1-self.eta) * self.tau * self.eta * self.eta * self.p_letter[0,self.alphabet[seq1[0]]],
b_Y[0,0] * self.delta * self.eta * self.eta * self.p_letter[0,self.alphabet[seq2[0]]],
b_X[0,0] * self.delta * self.eta * self.eta * self.p_letter[0,self.alphabet[seq1[0]]],
b_M[0,0] * (1-2*self.delta-self.tau) * self.eta * self.eta * self.p_match[self.alphabet[seq1[i]], self.alphabet[seq2[j]]],
b_Y1[0,0] * (1-self.eta) * self.eta * self.p_letter[0,self.alphabet[seq1[0]]],
b_X1[0,0] * (1-self.eta) * self.p_letter[0,self.alphabet[seq1[0]]]])
#cheking of computed values
assert f_E[n,m] - b_B[0,0] < eps, "Something is wrong and f_E(n,m) != b_B(0,0)"
self.f_M = f_M
self.b_M = b_M
self.f_X = f_X
self.b_X = b_X
self.f_Y = f_Y
self.b_Y = b_Y
self.f_E = f_E
self.f_X1 = f_X1
self.b_X1 = b_X1
self.f_X2 = f_X2
self.b_X2 = b_X2
self.f_Y1 = f_Y1
self.b_Y1 = b_Y1
self.f_Y2 = f_Y2
self.b_Y2= b_Y2
self.p_M = (f_M * b_M) / f_E[n,m]
self.p_X = (f_X * b_X) / f_E[n,m]
self.p_Y = (f_Y * b_Y) / f_E[n,m]
def print_alignment(self, i, j):
seq1 = self.alignments[i, j]
seq2 = self.alignments[j, i]
aligner = self.aligners[i, j]
score = self.scores[i, j]
print(f"score = {score}\n{seq1}\n{aligner}\n{seq2}")
def print_all_alignments(self):
N = len(self.seqs)
if N > 1:
for i in range(N):
for j in range(i, N):
print(f"Alignment {i} with {j}")
self.print_alignment(i, j)
print()
else:
print("At least two sequences are expected")
seqs = ["acttgcgcctcgc",
"acgggcgcctcgc", #mismatchs
"actcgcctcgc", #deletion
"acttgagtcgcctcgc"] #insertion
SW = SmithWaterman(seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=1, del_ext=1)
SW.align()
SW.print_all_alignments()
Alignment 0 with 0 score = 13.0 acttgcgcctcgc ||||||||||||| acttgcgcctcgc Alignment 0 with 1 score = 9.0 acttgcgcctcgc ||..||||||||| acgggcgcctcgc Alignment 0 with 2 score = 9.0 acttgcgcctcgc || | |||||||| ac-t-cgcctcgc Alignment 0 with 3 score = 10.0 actt--g-cgcctcgc |||| | |||||||| acttgagtcgcctcgc Alignment 1 with 1 score = 13.0 acgggcgcctcgc ||||||||||||| acgggcgcctcgc Alignment 1 with 2 score = 8.0 ggcgcctcg ..||||||| ctcgcctcg Alignment 1 with 3 score = 8.0 cgggcgcctcg ..|.||||||| gagtcgcctcg Alignment 2 with 2 score = 11.0 actcgcctcgc ||||||||||| actcgcctcgc Alignment 2 with 3 score = 9.0 actcgcctcgc |.||||||||| agtcgcctcgc Alignment 3 with 3 score = 16.0 acttgagtcgcctcgc |||||||||||||||| acttgagtcgcctcgc
Выравнивания последовательностей соответсвуют ожидаемым
SW = SmithWaterman(seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=5, del_ext=1)
SW.align()
SW.print_all_alignments()
Alignment 0 with 0 score = 13.0 acttgcgcctcgc ||||||||||||| acttgcgcctcgc Alignment 0 with 1 score = 9.0 acttgcgcctcgc ||..||||||||| acgggcgcctcgc Alignment 0 with 2 score = 8.0 tgcgcctcg ..||||||| ctcgcctcg Alignment 0 with 3 score = 8.0 tgcgcctcg ..||||||| gtcgcctcg Alignment 1 with 1 score = 13.0 acgggcgcctcgc ||||||||||||| acgggcgcctcgc Alignment 1 with 2 score = 8.0 ggcgcctcg ..||||||| ctcgcctcg Alignment 1 with 3 score = 8.0 cgggcgcctcg ..|.||||||| gagtcgcctcg Alignment 2 with 2 score = 11.0 actcgcctcgc ||||||||||| actcgcctcgc Alignment 2 with 3 score = 9.0 actcgcctcgc |.||||||||| agtcgcctcgc Alignment 3 with 3 score = 16.0 acttgagtcgcctcgc |||||||||||||||| acttgagtcgcctcgc
Из-за очень высокого штрафа за открытие делеции оптимальными оказываются короткие выравнивания с мисмэтчами, а не длинные с делециями.
alphabet = "atgc"
seqs = []
np.random.seed(42)
for i in range(20):
seqs.append("".join(np.random.choice(list(alphabet), 200)))
SW = SmithWaterman(seqs=seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=1, del_ext=1)
SW.align()
plt.hist(SW.scores[np.triu_indices(20,1)])
(array([ 8., 18., 17., 32., 29., 58., 15., 8., 2., 3.]), array([20. , 22.2, 24.4, 26.6, 28.8, 31. , 33.2, 35.4, 37.6, 39.8, 42. ]), <BarContainer object of 10 artists>)
SW = SmithWaterman(seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=5, del_ext=1)
SW.align()
plt.hist(SW.scores[np.triu_indices(20,1)])
(array([ 1., 40., 51., 56., 0., 31., 8., 2., 0., 1.]), array([ 6. , 6.8, 7.6, 8.4, 9.2, 10. , 10.8, 11.6, 12.4, 13.2, 14. ]), <BarContainer object of 10 artists>)
В случае афинных штрафов выравнивания случайных последовательностей имеют меньший вес
seqs = ["aactctcgtacgtacgtgcgggctacg"]
SW = SmithWaterman(seqs=seqs, match_matrix=2 * np.eye(4) - np.ones((4, 4)), del_open=1, del_ext=1)
SW.fb_two(0,0)
plt.imshow(SW.p_M)
plt.colorbar()
<matplotlib.colorbar.Colorbar at 0x7d311b87ed40>
По всей видимости в реализации алгоритма есть какая-то ошибка, но я не могу её найти. Как ещё объяснить почему даже выравнивание последовательности против самой себя имеет очень маленькую вероятность я не могу. Ещё я не смог разобраться откуда берутся параметры для HMM.