import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
1B. Smith-Waterman algorithm with linear and affine gap penalty¶
def argmax(arr):
max_v, max_i = arr[0][0][0], (0, 0)
for i in range(len(arr)):
for j in range(len(arr[0])):
c_v = arr[i][j][0]
if c_v > max_v:
max_v, max_i = c_v, (i, j)
return max_i
def make_seqs(n_seq, size):
ss = []
for i in range(n_seq):
seq_ = ''.join(np.random.choice(['a', 'c', 'g', 't'], size=size))
ss.append(seq_)
return ss
def water(seq1, seq2, sub_matrix, penalty_type='Linear', del_=1, del_open=10, del_ext=0.5, show=True):
'''
Smith-Waterman algorithm for pairwise alignment.
Parameters:
----------
seq1: str
A sequence for the local pairwise alignment
seq2: str
A sequence for the local pairwise alignment
sub_matrix: dict
A substituion matrix
penalty_type: str, opt, default="Linear"
Gap penalty type. A "Linear" gap penalty has the same scores for opening and
extending the gap, as opposed to an "Affine" gap penalty
del_: float, opt, default=1.0
A gap penalty used when penalty_type is set to "Linear"
del_open: float, opt, default=10.0
A gap opening penalty used when penalty_type is set to "Affine"
del_ext: float, opt, default=0.5
A gap extension penalty used when penalty_type is set to "Affine"
show: bool, opt, default=True
If True, the alignment will be shown
----------
'''
m, n = len(seq1), len(seq2)
if penalty_type == 'Linear':
backtracking_matrix = [[0 for i in range(n + 1)] for j in range(m + 1)]
dyn_matrix = np.zeros((m + 1, n + 1, 1))
for i in range(1, m + 1):
for j in range(1, n + 1):
vert_score = dyn_matrix[i - 1, j, 0] - del_
hor_score = dyn_matrix[i, j - 1, 0] - del_
diag_score = dyn_matrix[i - 1, j - 1, 0] + sub_matrix[seq1[i - 1]][seq2[j - 1]]
dyn_matrix[i, j, 0] = max(vert_score,
hor_score,
diag_score,
0)
# fill backtracking matrix
if dyn_matrix[i, j, 0] == diag_score or dyn_matrix[i, j, 0] == 0:
backtracking_matrix[i][j] = [i - 1, j - 1]
elif dyn_matrix[i, j, 0] == vert_score:
backtracking_matrix[i][j] = [i - 1, j]
else:
backtracking_matrix[i][j] = [i, j - 1]
elif penalty_type == 'Affine':
# stores alignment scores in format [max_score, X_score, Y_score, diag_score]
dyn_matrix = np.zeros((m + 1, n + 1, 4))
# stores traceback indices
backtracking_matrix = [[0 for i in range(n + 1)] for j in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
diag_X, diag_Y, diag_w = dyn_matrix[i - 1, j - 1, 1:] + sub_matrix[seq1[i - 1]][seq2[j - 1]]
vert_X = dyn_matrix[i - 1, j, 1] - del_ext
hor_Y = dyn_matrix[i, j - 1, 2] - del_ext
w_open_vert = dyn_matrix[i - 1, j, 3] - del_open - del_ext
w_open_hor = dyn_matrix[i, j - 1, 3] - del_open - del_ext
dyn_matrix[i, j, 3] = max(diag_X,
diag_Y,
diag_w,
0)
dyn_matrix[i, j, 1] = max(vert_X, w_open_vert)
dyn_matrix[i, j, 2] = max(hor_Y, w_open_hor)
dyn_matrix[i, j, 0] = max(dyn_matrix[i, j, 1:])
transition = np.argmax(dyn_matrix[i, j, 1:]) + 1
# fill backtracking matrix
# diagonal transition (pi_w)
if transition == 3:
backtracking_matrix[i][j] = [i - 1, j - 1]
# vertical transition (pi_X)
elif transition == 1:
backtracking_matrix[i][j] = [i - 1, j]
# horizontal transition (pi_Y)
else:
backtracking_matrix[i][j] = [i, j - 1]
return traceback(dyn_matrix, backtracking_matrix, seq1, seq2, show)
def traceback(d_matrix, b_matrix, s1_, s2_, show=True):
# backtracking and obtaining results
cur_i, cur_j = argmax(d_matrix)
score = d_matrix[cur_i, cur_j, 0]
prev_i, prev_j = cur_i, cur_j
start_i, start_j = cur_i, cur_j
align_1 = ''
align_2 = ''
matches = ''
while d_matrix[cur_i, cur_j, 0] != 0:
cur_i, cur_j = b_matrix[prev_i][prev_j]
# deletion in seq1
if cur_j == prev_j:
align_1 += s1_[prev_i - 1]
align_2 += '-'
matches += ' '
# deletion in seq2
elif cur_i == prev_i:
align_1 += '-'
align_2 += s2_[prev_j - 1]
matches += ' '
# no deletion
else:
align_1 += s1_[prev_i - 1]
align_2 += s2_[prev_j - 1]
if s1_[prev_i - 1] == s2_[prev_j - 1]:
matches += '|'
else:
matches += ' '
prev_i, prev_j = cur_i, cur_j
if show:
# print an alignment
print('Score:', score)
print(f'Identity: {matches.count("|") * 100 / len(matches):.01f} %')
print(f'{cur_i}\t{align_1[::-1]}\t{start_i - 1}')
print(f'\t{matches[::-1]}\t')
print(f'{cur_j}\t{align_2[::-1]}\t{start_j - 1}')
return score
Проверим работу на нескольких примерах и сравним с результатами, выдаваемыми реализацией алгоритма Ватермана-Смита.
Первый пример -- полностью совпадающие случайные последовательности длины 50. Score выравниваний проверен с помощью сервисов для локального выравнивания алгоритмом Ватермана-Смита с аффинными) и линейными штрафами.
s1 = s2 = make_seqs(1, 50)[0]
sm_nuc = {y: {x: 1 if x == y else -1 for x in ['a', 'c', 'g', 't']} for y in ['a', 'c', 'g', 't']}
water(s1, s2, sm_nuc, penalty_type='Linear', del_=1)
Score: 50.0 Identity: 100.0 % 0 gtccaagcttcattaaataacggcttatccaccatcaccccgaagttgga 49 |||||||||||||||||||||||||||||||||||||||||||||||||| 0 gtccaagcttcattaaataacggcttatccaccatcaccccgaagttgga 49
50.0
water(s1, s2, sm_nuc, penalty_type='Affine', del_open=5, del_ext=1)
Score: 50.0 Identity: 100.0 % 0 gtccaagcttcattaaataacggcttatccaccatcaccccgaagttgga 49 |||||||||||||||||||||||||||||||||||||||||||||||||| 0 gtccaagcttcattaaataacggcttatccaccatcaccccgaagttgga 49
50.0
Теперь добавим небольшой мисматч.
s1 = make_seqs(1, 50)[0]
s2 = s1[:25] + 'actg' + s1[29:]
water(s1, s2, sm_nuc, penalty_type='Linear', del_=1)
Score: 44.0 Identity: 94.0 % 0 caccgaggcttcgaccgttattggtggagtcgatgattgaaaacaactcg 49 ||||||||||||||||||||||||| |||||||||||||||||||||| 0 caccgaggcttcgaccgttattggtactgtcgatgattgaaaacaactcg 49
44.0
water(s1, s2, sm_nuc, penalty_type='Affine', del_open=5, del_ext=1)
Score: 44.0 Identity: 94.0 % 0 caccgaggcttcgaccgttattggtggagtcgatgattgaaaacaactcg 49 ||||||||||||||||||||||||| |||||||||||||||||||||| 0 caccgaggcttcgaccgttattggtactgtcgatgattgaaaacaactcg 49
44.0
Добавим одиночные делеции на небольшом расстоянии друг от друга. На примере ниже хорошо демонстрируется разница между аффинными и линейными штрафами за гэпы.
s1 = make_seqs(1, 50)[0]
s2 = s1[:20] + s1[21:23] + s1[24:26] + s1[27:29] + s1[30:]
water(s1, s2, sm_nuc, penalty_type='Linear', del_=1)
Score: 42.0 Identity: 92.0 % 0 tcttttctgacgaaaatgtcaacaccgtacaagccggtccctatcgggga 49 |||||||||||||||||||| || || || |||||||||||||||||||| 0 tcttttctgacgaaaatgtc-ac-cc-ta-aagccggtccctatcgggga 45
42.0
water(s1, s2, sm_nuc, penalty_type='Affine', del_open=5, del_ext=1)
Score: 30.0 Identity: 90.0 % 0 tcttttctgacgaaaatgtcaacaccgtacaagccggtccctatcgggga 49 ||||||||||||||||||||| || || |||||||||||||||||||| 0 tcttttctgacgaaaatgtca---cccta-aagccggtccctatcgggga 45
30.0
Сделаем последовательности с совпадающим участком.
s1 = make_seqs(1, 50)[0]
s2 = make_seqs(1, 15)[0] + s1[5:15] + make_seqs(1, 20)[0]
water(s1, s2, sm_nuc, penalty_type='Linear', del_=1)
Score: 13.0 Identity: 76.0 % 4 aggtcgtgagccct-tg-tcga-at 25 |||||||||| | | || | || || 14 aggtcgtgag-catgtgat-gatat 36
13.0
water(s1, s2, sm_nuc, penalty_type='Affine', del_open=5, del_ext=1)
Score: 11.0 Identity: 100.0 % 4 aggtcgtgagc 14 ||||||||||| 14 aggtcgtgagc 24
11.0
1B. Pairwise alignment for every pair of 20 random sequences of length 200¶
sequences = make_seqs(20, 200)
print(len(sequences), len(sequences[0]))
20 200
weights_lin = []
for i in tqdm(range(len(sequences))):
for j in range(len(sequences)):
s_1, s_2 = sequences[i], sequences[j]
alignmnent_ij = water(s_1, s_2, sm_nuc, 'Linear', del_=1, show=False)
weights_lin.append(alignmnent_ij)
weights_aff = []
for i in tqdm(range(len(sequences))):
for j in range(len(sequences)):
s_1, s_2 = sequences[i], sequences[j]
alignmnent_ij = water(s_1, s_2, sm_nuc, 'Affine', del_open=5, del_ext=1, show=False)
weights_aff.append(alignmnent_ij)
100%|██████████| 20/20 [01:05<00:00, 3.30s/it] 100%|██████████| 20/20 [04:23<00:00, 13.18s/it]
Построим гистограммы для каждого типа штрафов и уберем выравнивания последовательностей на самих себя.
plt.hist([x for x in weights_lin if x < 200], color='orange')
plt.title('Alignment scores distribution for linear gap penalty')
plt.xlabel('Score')
Text(0.5, 0, 'Score')
Заметим, что в случае линейных штрафов за гэпы форма распределения напоминает гауссиану (но это распределение Гамбела), для афинных штрафов сложно сказать. Также по гистограммам видим, что в среднем у выравнивания с линейными штрафами для случайных последовательностей больший вес (с нашими исходными параметрами это в целом ожидаемо -- большой штраф за открытие гэпа).
plt.hist([x for x in weights_aff if x < 200], color='orange')
plt.title('Alignment scores distribution for affine gap penalty')
plt.xlabel('Score')
Text(0.5, 0, 'Score')