image.png

In [ ]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

1B. Smith-Waterman algorithm with linear and affine gap penalty¶

In [ ]:
def argmax(arr):
  max_v, max_i = arr[0][0][0], (0, 0)
  for i in range(len(arr)):
    for j in range(len(arr[0])):
      c_v = arr[i][j][0]
      if  c_v > max_v:
        max_v, max_i = c_v, (i, j)
  return max_i


def make_seqs(n_seq, size):
  ss = []
  for i in range(n_seq):
    seq_ = ''.join(np.random.choice(['a', 'c', 'g', 't'], size=size))
    ss.append(seq_)
  return ss
In [ ]:
def water(seq1, seq2, sub_matrix, penalty_type='Linear', del_=1, del_open=10, del_ext=0.5, show=True):
  '''
  Smith-Waterman algorithm for pairwise alignment.
  Parameters:
  ----------
  seq1: str
    A sequence for the local pairwise alignment
  seq2: str
    A sequence for the local pairwise alignment
  sub_matrix: dict
    A substituion matrix
  penalty_type: str, opt, default="Linear"
    Gap penalty type. A "Linear" gap penalty has the same scores for opening and
    extending the gap, as opposed to an "Affine" gap penalty
  del_: float, opt, default=1.0
    A gap penalty used when penalty_type is set to "Linear"
  del_open: float, opt, default=10.0
    A gap opening penalty used when penalty_type is set to "Affine"
  del_ext: float, opt, default=0.5
    A gap extension penalty used when penalty_type is set to "Affine"
  show: bool, opt, default=True
    If True, the alignment will be shown
  ----------
  '''
  m, n = len(seq1), len(seq2)

  if penalty_type == 'Linear':
    backtracking_matrix = [[0 for i in range(n + 1)] for j in range(m + 1)]
    dyn_matrix = np.zeros((m + 1, n + 1, 1))
    for i in range(1, m + 1):
      for j in range(1, n + 1):
            vert_score = dyn_matrix[i - 1, j, 0] - del_
            hor_score = dyn_matrix[i, j - 1, 0] - del_
            diag_score = dyn_matrix[i - 1, j - 1, 0] + sub_matrix[seq1[i - 1]][seq2[j - 1]]
            dyn_matrix[i, j, 0] = max(vert_score,
                                   hor_score,
                                   diag_score,
                                   0)
            # fill backtracking matrix
            if dyn_matrix[i, j, 0] == diag_score or dyn_matrix[i, j, 0] == 0:
              backtracking_matrix[i][j] = [i - 1, j - 1]
            elif dyn_matrix[i, j, 0] == vert_score:
              backtracking_matrix[i][j] = [i - 1, j]
            else:
              backtracking_matrix[i][j] = [i, j - 1]


  elif penalty_type == 'Affine':
    # stores alignment scores in format [max_score, X_score, Y_score, diag_score]
    dyn_matrix = np.zeros((m + 1, n + 1, 4))
    # stores traceback indices
    backtracking_matrix = [[0 for i in range(n + 1)] for j in range(m + 1)]
    for i in range(1, m + 1):
      for j in range(1, n + 1):
        diag_X, diag_Y, diag_w = dyn_matrix[i - 1, j - 1, 1:] + sub_matrix[seq1[i - 1]][seq2[j - 1]]
        vert_X = dyn_matrix[i - 1, j, 1] - del_ext
        hor_Y = dyn_matrix[i, j - 1, 2] - del_ext
        w_open_vert = dyn_matrix[i - 1, j, 3] - del_open - del_ext
        w_open_hor = dyn_matrix[i, j - 1, 3] - del_open - del_ext
        dyn_matrix[i, j, 3] = max(diag_X,
                         diag_Y,
                         diag_w,
                         0)
        dyn_matrix[i, j, 1] = max(vert_X, w_open_vert)
        dyn_matrix[i, j, 2] = max(hor_Y, w_open_hor)
        dyn_matrix[i, j, 0] = max(dyn_matrix[i, j, 1:])
        transition = np.argmax(dyn_matrix[i, j, 1:]) + 1
        # fill backtracking matrix
        # diagonal transition (pi_w)
        if transition == 3:
          backtracking_matrix[i][j] = [i - 1, j - 1]
        # vertical transition (pi_X)
        elif transition == 1:
          backtracking_matrix[i][j] = [i - 1, j]
        # horizontal transition (pi_Y)
        else:
          backtracking_matrix[i][j] = [i, j - 1]

  return traceback(dyn_matrix, backtracking_matrix, seq1, seq2, show)

def traceback(d_matrix, b_matrix, s1_, s2_, show=True):
  # backtracking and obtaining results
  cur_i, cur_j = argmax(d_matrix)
  score = d_matrix[cur_i, cur_j, 0]
  prev_i, prev_j = cur_i, cur_j
  start_i, start_j = cur_i, cur_j
  align_1 = ''
  align_2 = ''
  matches = ''
  while d_matrix[cur_i, cur_j, 0] != 0:
    cur_i, cur_j = b_matrix[prev_i][prev_j]
    # deletion in seq1
    if cur_j == prev_j:
      align_1 += s1_[prev_i - 1]
      align_2 += '-'
      matches += ' '
    # deletion in seq2
    elif cur_i == prev_i:
      align_1 += '-'
      align_2 += s2_[prev_j - 1]
      matches += ' '
    # no deletion
    else:
      align_1 += s1_[prev_i - 1]
      align_2 += s2_[prev_j - 1]
      if s1_[prev_i - 1] == s2_[prev_j - 1]:
        matches += '|'
      else:
        matches += ' '
    prev_i, prev_j = cur_i, cur_j

  if show:
    # print an alignment
    print('Score:', score)
    print(f'Identity: {matches.count("|") * 100 / len(matches):.01f} %')
    print(f'{cur_i}\t{align_1[::-1]}\t{start_i - 1}')
    print(f'\t{matches[::-1]}\t')
    print(f'{cur_j}\t{align_2[::-1]}\t{start_j - 1}')
  return score

Проверим работу на нескольких примерах и сравним с результатами, выдаваемыми реализацией алгоритма Ватермана-Смита.
Первый пример -- полностью совпадающие случайные последовательности длины 50. Score выравниваний проверен с помощью сервисов для локального выравнивания алгоритмом Ватермана-Смита с аффинными) и линейными штрафами.

In [ ]:
s1 = s2 = make_seqs(1, 50)[0]
sm_nuc = {y: {x: 1 if x == y else -1 for x in ['a', 'c', 'g', 't']} for y in ['a', 'c', 'g', 't']}
water(s1, s2, sm_nuc, penalty_type='Linear', del_=1)
Score: 50.0
Identity: 100.0 %
0	gtccaagcttcattaaataacggcttatccaccatcaccccgaagttgga	49
	||||||||||||||||||||||||||||||||||||||||||||||||||	
0	gtccaagcttcattaaataacggcttatccaccatcaccccgaagttgga	49
Out[ ]:
50.0
In [ ]:
water(s1, s2, sm_nuc, penalty_type='Affine', del_open=5, del_ext=1)
Score: 50.0
Identity: 100.0 %
0	gtccaagcttcattaaataacggcttatccaccatcaccccgaagttgga	49
	||||||||||||||||||||||||||||||||||||||||||||||||||	
0	gtccaagcttcattaaataacggcttatccaccatcaccccgaagttgga	49
Out[ ]:
50.0

Теперь добавим небольшой мисматч.

In [ ]:
s1 = make_seqs(1, 50)[0]
s2 = s1[:25] + 'actg' + s1[29:]
water(s1, s2, sm_nuc, penalty_type='Linear', del_=1)
Score: 44.0
Identity: 94.0 %
0	caccgaggcttcgaccgttattggtggagtcgatgattgaaaacaactcg	49
	|||||||||||||||||||||||||   ||||||||||||||||||||||	
0	caccgaggcttcgaccgttattggtactgtcgatgattgaaaacaactcg	49
Out[ ]:
44.0
In [ ]:
water(s1, s2, sm_nuc, penalty_type='Affine', del_open=5, del_ext=1)
Score: 44.0
Identity: 94.0 %
0	caccgaggcttcgaccgttattggtggagtcgatgattgaaaacaactcg	49
	|||||||||||||||||||||||||   ||||||||||||||||||||||	
0	caccgaggcttcgaccgttattggtactgtcgatgattgaaaacaactcg	49
Out[ ]:
44.0

Добавим одиночные делеции на небольшом расстоянии друг от друга. На примере ниже хорошо демонстрируется разница между аффинными и линейными штрафами за гэпы.

In [ ]:
s1 = make_seqs(1, 50)[0]
s2 = s1[:20] + s1[21:23] + s1[24:26] + s1[27:29] + s1[30:]
water(s1, s2, sm_nuc, penalty_type='Linear', del_=1)
Score: 42.0
Identity: 92.0 %
0	tcttttctgacgaaaatgtcaacaccgtacaagccggtccctatcgggga	49
	|||||||||||||||||||| || || || ||||||||||||||||||||	
0	tcttttctgacgaaaatgtc-ac-cc-ta-aagccggtccctatcgggga	45
Out[ ]:
42.0
In [ ]:
water(s1, s2, sm_nuc, penalty_type='Affine', del_open=5, del_ext=1)
Score: 30.0
Identity: 90.0 %
0	tcttttctgacgaaaatgtcaacaccgtacaagccggtccctatcgggga	49
	|||||||||||||||||||||   || || ||||||||||||||||||||	
0	tcttttctgacgaaaatgtca---cccta-aagccggtccctatcgggga	45
Out[ ]:
30.0

Сделаем последовательности с совпадающим участком.

In [ ]:
s1 = make_seqs(1, 50)[0]
s2 = make_seqs(1, 15)[0] + s1[5:15] + make_seqs(1, 20)[0]
water(s1, s2, sm_nuc, penalty_type='Linear', del_=1)
Score: 13.0
Identity: 76.0 %
4	aggtcgtgagccct-tg-tcga-at	25
	|||||||||| | | || | || ||	
14	aggtcgtgag-catgtgat-gatat	36
Out[ ]:
13.0
In [ ]:
water(s1, s2, sm_nuc, penalty_type='Affine', del_open=5, del_ext=1)
Score: 11.0
Identity: 100.0 %
4	aggtcgtgagc	14
	|||||||||||	
14	aggtcgtgagc	24
Out[ ]:
11.0

1B. Pairwise alignment for every pair of 20 random sequences of length 200¶

In [ ]:
sequences = make_seqs(20, 200)
In [ ]:
print(len(sequences), len(sequences[0]))
20 200
In [ ]:
weights_lin = []

for i in tqdm(range(len(sequences))):
  for j in range(len(sequences)):
    s_1, s_2 = sequences[i], sequences[j]
    alignmnent_ij = water(s_1, s_2, sm_nuc, 'Linear', del_=1, show=False)
    weights_lin.append(alignmnent_ij)

weights_aff = []

for i in tqdm(range(len(sequences))):
  for j in range(len(sequences)):
    s_1, s_2 = sequences[i], sequences[j]
    alignmnent_ij = water(s_1, s_2, sm_nuc, 'Affine', del_open=5, del_ext=1, show=False)
    weights_aff.append(alignmnent_ij)
100%|██████████| 20/20 [01:05<00:00,  3.30s/it]
100%|██████████| 20/20 [04:23<00:00, 13.18s/it]

Построим гистограммы для каждого типа штрафов и уберем выравнивания последовательностей на самих себя.

In [ ]:
plt.hist([x for x in weights_lin if x < 200], color='orange')
plt.title('Alignment scores distribution for linear gap penalty')
plt.xlabel('Score')
Out[ ]:
Text(0.5, 0, 'Score')
No description has been provided for this image

Заметим, что в случае линейных штрафов за гэпы форма распределения напоминает гауссиану (но это распределение Гамбела), для афинных штрафов сложно сказать. Также по гистограммам видим, что в среднем у выравнивания с линейными штрафами для случайных последовательностей больший вес (с нашими исходными параметрами это в целом ожидаемо -- большой штраф за открытие гэпа).

In [ ]:
plt.hist([x for x in weights_aff if x < 200], color='orange')
plt.title('Alignment scores distribution for affine gap penalty')
plt.xlabel('Score')
Out[ ]:
Text(0.5, 0, 'Score')
No description has been provided for this image