import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from scipy.stats import norm
Задание 1¶
Задание 1.1 Генерация данных¶
Ниже имеются гистограммы, отражающие распределение сгенерированных данных.
np.random.seed(42)
n1 = int(input())
n2 = int(input())
mu1, sd1, mu2, sd2 = 0, 1, 1, 1
x1 = np.random.normal(mu1, sd1, n1)
x2 = np.random.normal(mu2, sd2, n2)
x = np.concatenate([x1, x2])
answers = [0 for i in range(n1)]
answers.extend([1 for i in range(n2)])
100 5
Задание 1.2 EM-оценка¶
Формула логарифма правдоподобия, с помощью которого подбирается оптимальное k на каждом шаге оптимизации
Шаг 0: начинаем со случайных разбиений
Шаг 1: оценка параметров разбиений (шаг оценки)
Шаг 2: оптимизация разбиений с помощью логарифма правдоподобия (шаг максимизации) --> вернуться к шагу 1
# @title EM алгоритм
def maximize_likelihood(arr, exp1, exp2, disp1, disp2):
likelihoods = []
sorted_arr = sorted(arr)
for i in range(2, len(arr) - 2):
arr1, arr2 = sorted_arr[:i], sorted_arr[i + 1:]
likelihood_log = - np.sum((arr1 - exp1) ** 2) / (2 * disp1 ** 2) - np.sum((arr2 - exp2) ** 2) / (2 * disp2 ** 2)
likelihoods.append((likelihood_log, i))
return max(likelihoods, key=lambda x: x[0])
def expectation_maximization(arr, convergence_threshold=0.1, n_convergence=15):
k = np.random.randint(2, len(arr) - 2)
x1_p, x2_p = sorted(arr)[:k], sorted(arr)[k + 1:]
prev_curr_likelihood, cnt = 0, 0
while cnt < n_convergence:
# Expectation step
e1, e2, d1, d2 = np.mean(x1_p), np.mean(x2_p), np.std(x1_p, ddof=1), np.std(x2_p, ddof=1)
# Maximization step
current_likelihood, k = maximize_likelihood(arr, e1, e2, d1, d2)
change = current_likelihood - prev_curr_likelihood
prev_curr_likelihood = current_likelihood
# Convergence check
if change < convergence_threshold:
cnt += 1
else:
cnt = 0
x1_p, x2_p = sorted(arr)[:k], sorted(arr)[k + 1:]
if e1 > e2:
e1, d1, e2, d2 = e2, d2, e1, d1
return k, e1, d1, e2, d2
Подобрав оптимальное k для разбиений, найдем необходимые для ответа величины. Во-первых, мат.ожидания μ1, μ2.
res_1 = expectation_maximization(x)
print(f'μ1 is {res_1[1]:.2f}, correct is {mu1}\nμ2 is {res_1[3]:.2f}, correct is {mu2}')
μ1 is -0.66, correct is 0 μ2 is 1.13, correct is 1
Рассмотрим, как выглядят разбиения, построенные с помощью expectation maximization и реальные данные. Заметнее всего то, что не получилось точно предсказать стандартное отклонение.
# @title Рисование графиков
def draw_EM_results(arr, x1_orig, x2_orig, EM_results):
k, e1, d1, e2, d2 = EM_results
part1, part2 = sorted(arr)[:k], sorted(arr)[k + 1:]
fig, ax = plt.subplots(1, 2, figsize=(12, 4))
ax[0].set_title('Original distributions')
ax[1].set_title('Predicted distributions')
for a in [x1_orig, x2_orig]:
sns.histplot(a, ax=ax[0], stat='density', bins=np.arange(min(arr), max(arr), 0.3), alpha=0.7, ls='None')
x_axis = np.arange(min(arr), max(arr), 0.3)
y_axis_1 = norm.pdf(x_axis, e1, d1)
y_axis_2 = norm.pdf(x_axis, e2, d2)
ax[1].plot(x_axis, y_axis_1)
ax[1].plot(x_axis, y_axis_2)
for a in [x1_orig, x2_orig]:
sns.histplot(a, ax=ax[1], stat='density', color='teal', alpha=0.3, ls='None')
draw_EM_results(x, x1, x2, res_1)
Определим для каждого числа вероятность принадлежать распределению 1 и 2, основываясь на построенной модели, и сравним с исходными распределениями с помощью метрики accuracy score.
from sklearn.metrics import accuracy_score
def probabilities(arr, m1, s1, m2, s2, n1, n2):
prior_p1, prior_p2 = n1 / (n1 + n2), n2 / (n1 + n2)
res = []
for el in arr:
p1, p2 = norm.pdf(el, m1, s1), norm.pdf(el, m2, s2)
p = prior_p1 * p1 + prior_p2 * p2
res.append((prior_p1 * p1 / p, prior_p2 * p2 / p))
return res
probs = probabilities(x, *res_1[1:], len(x1), len(x2))
predicted = [0 if el[0] > el[1] else 1 for el in probs]
round(accuracy_score(answers, predicted), 4)
0.686
Сами вероятности выглядят так, первый элемент - вероятность принадлежать первому распределению, второй - второму.
probs[:2]
[(0.33309915402406204, 0.6669008459759379), (0.8768575471952835, 0.12314245280471656)]
Также определим количество чисел, пришедших из распределений 1 и 2, согласно предсказаниям модели.
print(f'Принадлежат 1-му распределению: {len(predicted) - sum(predicted)}, второму: {sum(predicted)}')
Принадлежат 1-му распределению: 751, второму: 749
Проведя вышеперечисленные действия для датасетов размеров 10 и 5, 100 и 50, 1000 и 500, я пришла к следующему выводу: точность модели растет при повышении размеров датасетов. Она меньше подвержена влиянию выбросов при увеличении размеров выборки.
Также хочется отметить, что точность возрастает и при снижении пересечения двух распределений (см.ниже -- увеличим параметры исходного распределения (исходно это (1, 1)), а именно матожидание).
test_x2, test_x3 = np.random.normal(mu2 + 1, sd2, n2), np.random.normal(mu2 + 2, sd2, n2)
test1 = np.concatenate([x1, test_x2])
test2 = np.concatenate([x1, test_x3])
res_2 = expectation_maximization(test1)
res_3 = expectation_maximization(test2)
print(f'μ1 is {res_2[1]:.2f}, correct is {mu1}\nμ2 is {res_2[3]:.2f}, correct is {mu2 + 1}')
print(f'μ1 is {res_3[1]:.2f}, correct is {mu1}\nμ2 is {res_3[3]:.2f}, correct is {mu2 + 2}')
μ1 is -0.36, correct is 0 μ2 is 1.92, correct is 2 μ1 is -0.29, correct is 0 μ2 is 2.61, correct is 3
draw_EM_results(test1, x1, test_x2, res_2)
draw_EM_results(test2, x1, test_x3, res_3)
probs_1 = probabilities(test1, *res_2[1:], len(x1), len(x2))
predicted_1 = [0 if el[0] > el[1] else 1 for el in probs_1]
round(accuracy_score(answers, predicted_1), 4)
0.846
probs_2 = probabilities(test2, *res_3[1:], len(x1), len(x2))
predicted_2 = [0 if el[0] > el[1] else 1 for el in probs_2]
round(accuracy_score(answers, predicted_2), 4)
0.9127
Действительно, точность предсказаний возросла.
Задание 2. Последовательности и их модели¶
2.1 Создаем последовательности¶
Сгенерируем 1000 последовательностей по 3000 bp над алфавитом {a, b}. Вероятности переходов выберем случайно (независимых из них только две).
a = ['a', 'b']
prob_1, prob_2, start_prob = np.random.random(), np.random.rand(), np.random.rand()
starting_prob = [start_prob, 1 - start_prob]
transition_matrix = [[prob_1, 1 - prob_1],
[prob_2, 1-prob_2]]
pd.DataFrame(transition_matrix, columns=a, index=a)
a | b | |
---|---|---|
a | 0.518791 | 0.481209 |
b | 0.703019 | 0.296981 |
def create_seq_markov(alphabet, n, weights, starting_weights):
res = np.random.choice(alphabet, p=starting_weights)
for i in range(1, n):
prev_symbol = res[i - 1]
res += np.random.choice(alphabet, p=weights[alphabet.index(prev_symbol)])
return res
sequences = []
for i in range(1000):
sequences.append(create_seq_markov(a, 3000, transition_matrix, starting_prob))
2.2 (и заодно 2.4.1) оценка моделей¶
Теперь оценим несколько моделей, которые можно использовать для генерации таких последовательностей: бернуллиевскую и марковские 1-3 порядков. Для этого используем критерии AIC (Akaike information criterion) и BIC (Bayesian information criterion).
Начнем с биномиального распределения. Количество (независимых) параметров для нее = 1. Значения критериев рассчитываются по следующим формулам:
,
где k - число параметров модели, L - оптимальное правдоподобие, n - размер (обучающей) выборки
# @title Информационные критерии для модели Бернулли
from collections import Counter
def calculate_criteria_binomial(seqs, alphabet, n, l):
'''Модель -- биномиальное распределение'''
frequencies = Counter(''.join(seqs))
frequencies = dict(zip(alphabet, [frequencies[k] / (n * l) for k, v in frequencies.items()]))
print('Frequencies:', frequencies)
log_L = 0
for seq in seqs:
log_prob_seq = 0
for el in seq:
log_prob_seq += np.log(frequencies[el])
log_L += log_prob_seq
AIC_bernoulli = 2 * 1 - 2 * log_L
BIC_bernoulli = 1 * np.log(l) - 2 * log_L
print(f'AIC: {AIC_bernoulli:.04f}, BIC: {BIC_bernoulli:.04f}\nlog(L) = {log_L:.04f}')
calculate_criteria_binomial(sequences, a, 3000, 1000)
Frequencies: {'a': 0.4063873333333333, 'b': 0.5936126666666667} AIC: 4478762.8393, BIC: 4478767.7471 log(L) = -2239380.4197
Далее применим критерии для марковских моделей 1, 2 и 3 порядка. Для построения матрицы перехода посчитаем количество (n + 1)-меров, где n - порядок и частоты самих символов. Оценим соответствующие вероятности, используя формулу условной вероятности:
# @title Получение матрицы переходов для модели Маркова 1 порядка
from itertools import product
from collections import OrderedDict
def calculate_kmers(seqs, alphabet, k):
res = {''.join(el): 0 for el in product(''.join(alphabet), repeat=k)}
res = OrderedDict(res)
cnt = 0
for s in seqs:
for i in range(0, len(s) - k, k):
kmer = s[i: i + k]
res[kmer] += 1
cnt += 1
return {k: v / cnt for k, v in res.items()}
def create_transition_matrix(kmers, alphabet, frequencies):
res = pd.DataFrame(columns=alphabet)
for k, v in kmers.items():
kmer, letter = k[:-1], k[-1]
if len(kmer) < 1 or frequencies[kmer] == 0:
res.loc[kmer, letter] = v
else:
res.loc[kmer, letter] = v / frequencies[kmer]
return res
def markov_chain(seqs, alphabet, order):
matrices = []
prev_freq = None
# оценка начальных условий из предположения об эргодичности
# и также матрицы переходов
for k in range(order + 1):
kmers_freq = calculate_kmers(seqs, alphabet, k + 1)
cur_matrix = create_transition_matrix(kmers_freq, alphabet, prev_freq)
matrices.append(cur_matrix)
prev_freq = kmers_freq
return matrices
def calculate_criteria_markov(seqs, matrices, alphabet):
log_L = 0
order = len(matrices) - 1
for seq in seqs:
log_L += np.log(float(matrices[0][seq[0]]))
prev_el = seq[0]
for k in range(1, order):
kth_matrix = matrices[k]
cur_el = seq[k]
log_L += np.log(kth_matrix.loc[prev_el, cur_el])
prev_el += cur_el
t_matrix = matrices[-1]
for i in range(order, len(seq)):
cur_el, prev_el = seq[i], seq[i - order:i]
log_L += np.log(t_matrix.loc[prev_el, cur_el])
AIC = (len(alphabet) ** 2 - 1) * 2 - 2 * log_L
BIC = (len(alphabet) ** 2 - 1) * len(seqs) - 2 * log_L
return log_L, AIC, BIC
markov_matrices_1 = markov_chain(sequences, a, 1)
markov_matrices_1[-1]
a | b | |
---|---|---|
a | 0.518835 | 0.480309 |
b | 0.704073 | 0.297178 |
Теперь подсчитаем значения информационных критериев для этой модели. В результате получаем, что значения критериев для нее ниже, чем для биномиальной.
criteria_markov_1 = calculate_criteria_markov(sequences, markov_matrices_1, a)
print(f'AIC: {criteria_markov_1[1]:.04f}, BIC: {criteria_markov_1[2]:.04f}\nlog(L) = {criteria_markov_1[0]:.04f}')
AIC: 3949634.6152, BIC: 3952628.6152 log(L) = -1974814.3076
Далее сделаем то же самое для моделей 2 и 3 порядков. Матрица переходов для моделей 2 порядка (матрицы с начальными распределениями не показаны).
markov_matrices_2 = markov_chain(sequences, a, 2)
markov_matrices_2[-1]
a | b | |
---|---|---|
aa | 0.518211 | 0.480115 |
ab | 0.704337 | 0.295215 |
ba | 0.521393 | 0.480098 |
bb | 0.701788 | 0.300007 |
И значения информационных критериев:
criteria_markov_2 = calculate_criteria_markov(sequences, markov_matrices_2, a)
print(f'AIC: {criteria_markov_2[1]:.04f}, BIC: {criteria_markov_2[2]:.04f}\nlog(L) = {criteria_markov_2[0]:.04f}')
AIC: 3949655.0948, BIC: 3952649.0948 log(L) = -1974824.5474
Как видим, значения критериев у модели 2 порядка возросли. И аналогично для модели 3 порядка, матрица переходов:
markov_matrices_3 = markov_chain(sequences, a, 3)
markov_matrices_3[-1]
a | b | |
---|---|---|
aaa | 0.518141 | 0.479585 |
aab | 0.705886 | 0.295586 |
aba | 0.519254 | 0.479809 |
abb | 0.709289 | 0.300237 |
baa | 0.519078 | 0.476121 |
bab | 0.705236 | 0.296344 |
bba | 0.522037 | 0.48012 |
bbb | 0.697253 | 0.298549 |
И значения информационных критериев:
criteria_markov_3 = calculate_criteria_markov(sequences, markov_matrices_3, a)
print(f'AIC: {criteria_markov_3[1]:.04f}, BIC: {criteria_markov_3[2]:.04f}\nlog(L) = {criteria_markov_3[0]:.04f}')
AIC: 3949625.2775, BIC: 3952619.2775 log(L) = -1974809.6387
Значение критериев наименьшее у модели 3 порядка. Какой вывод можно сделать из этого? Марковские модели на сгенерированных последовательностях показали себя лучше (значения информационных критериев меньше), чем бернуллиевская (что и ожидалось). При этом наименьшие значения критериев оказались у модели 3 порядка (3 < 1 < 2).
2.3 Применение различных моделей на реальных данных (геноме B.subtilis)¶
Для этого задания я взяла референсный геном B.subtilis с GenBank ID GCA_000009045.1. Его размер 4.2 Mb.
!wget 'https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/000/009/045/GCA_000009045.1_ASM904v1/GCA_000009045.1_ASM904v1_genomic.fna.gz' -O genome.fna.gz
--2024-03-10 20:46:51-- https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/000/009/045/GCA_000009045.1_ASM904v1/GCA_000009045.1_ASM904v1_genomic.fna.gz Resolving ftp.ncbi.nlm.nih.gov (ftp.ncbi.nlm.nih.gov)... 130.14.250.11, 130.14.250.12, 2607:f220:41e:250::7, ... Connecting to ftp.ncbi.nlm.nih.gov (ftp.ncbi.nlm.nih.gov)|130.14.250.11|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 1248394 (1.2M) [application/x-gzip] Saving to: ‘genome.fna.gz’ genome.fna.gz 100%[===================>] 1.19M --.-KB/s in 0.09s 2024-03-10 20:46:52 (13.4 MB/s) - ‘genome.fna.gz’ saved [1248394/1248394]
!gunzip genome.fna.gz
with open('genome.fna') as inp:
# в первом строке название последовательности, его пропускаем
inp.readline()
genome = ''.join([line.strip() for line in inp])
Поделим всю последовательность на куски по 1000 нуклеотидов. Полный из них 4215, поэтому для простоты отбросим последний фрагмент (длина в районе 600 bp).
genome_split = [genome[i:i + 1000] for i in range(0, len(genome), 1000)]
genome_split = genome_split[:-1]
len(genome_split)
4215
Оценим параметры бернуллииевской модели и посчитаем информационные критерии для нее.
nucleotides = ['A', 'C', 'G', 'T']
calculate_criteria_binomial(genome_split, nucleotides, 1000, len(genome_split))
Frequencies: {'A': 0.2818268090154211, 'C': 0.283020640569395, 'G': 0.21806714116251483, 'T': 0.21708540925266903} AIC: 11760609.4301, BIC: 11760615.7765 log(L) = -5880303.7151
Далее используем Марковские модели 1-5 порядков. Для удобства значения критериев организованы в таблицу. Матрицы переходов не показаны (если требуется, они записаны в переменную genome_matrices).
model_results = pd.DataFrame(columns=[1, 2, 3, 4, 5], index=['LogL', 'AIC', 'BIC'])
genome_matrices = []
indices = ['LogL', 'AIC', 'BIC']
for m_order in range(1, 6):
markov_matrices = markov_chain(genome_split, nucleotides, m_order)
genome_matrices.append(markov_matrices[-1])
criteria_markov = calculate_criteria_markov(genome_split, markov_matrices, nucleotides)
for i in range(3):
model_results.loc[indices[i], m_order] = criteria_markov[i]
pd.options.display.float_format = '{:.4f}'.format
model_results['binomial'] = np.array([-5880303.7151, 11760609.4301, 11760615.7765])
model_results
1 | 2 | 3 | 4 | 5 | binomial | |
---|---|---|---|---|---|---|
LogL | -5736721.2391 | -5694604.5763 | -5676147.3045 | -5669468.4956 | -5668169.7666 | -5880303.7151 |
AIC | 11473472.4781 | 11389239.1526 | 11352324.6090 | 11338966.9912 | 11336369.5331 | 11760609.4301 |
BIC | 11536667.4781 | 11452434.1526 | 11415519.6090 | 11402161.9912 | 11399564.5331 | 11760615.7765 |
Визуализируем один из критериев, например, BIC. Заметим, что модели становятся лучше согласно обоим информационным критериям в порядке, указанном на графике (т.е. марковская модель 5 порядка -- самая лучшая). Интересно отметить, что после 3 порядка значения критериев изменяются не так сильно при переходе на более высокий порядок. В связи с этим можно было бы проверить модель на переобучение, создав тренировочную, тестовую и валидационную выборки.
fig = sns.scatterplot(x=[str(x) for x in list(model_results.columns)], y=model_results.loc[['BIC']].transpose()['BIC'].tolist())
fig.set_title('BIC')
Text(0.5, 1.0, 'BIC')
2.4.2 Проверка эргодичности реальных данных¶
Выделим подпоследовательности из нарезки, начиная с индексов, равномерно распределенных на интервале [1, 3000]. Исключим последовательности, у которых начальный индекс не попал в промежуток [1, 1000] (т.е. пустые строки).
substring_indices = np.random.randint(low=1, high=3000, size=len(genome_split))
substrings = [genome_split[i][substring_indices[i]:] for i in range(len(substring_indices))]
substrings = [x for x in substrings if len(x) > 0 ]
fig1 = sns.histplot([len(x) for x in substrings])
fig1.set_title('Substring length distribution')
Text(0.5, 1.0, 'Substring length distribution')
На этом все, остальное не успела :)