import numpy as np
import matplotlib.pyplot as plt
class HMM:
"""
Class representing Hidden Markov Model.
Attributes:
states (list): List of posible states, including begin_state and end_state
a_lk (dict of dict): transition probability matrix,
where a_lk[state_l][state_k] is float
e_k (dict of dict): emission probability matrix,
where e_k[state][letter] is float
begin_state : begin_state
end_state : end_state
alphabet : alphabet
seq (str or other iter): last analysed sequence
"""
def __init__(self, states, known_a_lk, known_e_k, begin_state="B", end_state="E", alphabet="acgt"):
"""
Initializes a HMM.
Parameters:
states (list): List of posible states, including begin_state and end_state
known_a_lk (list): list of tuples (state_l, state_k, a_lk),
where a_lk (float) is transition probability for state_l --> state_k
known_e_k (list): list of tuples (state_k, letter, e_k),
where e_k (float) is emission probability for letter in state_k
begin_state (default : "B")
end_state (default : "E")
alphabet (default : "acgt")
Note that some not defined a_lk or e_k might be computed. If it cannot be done unambiguously, you will get erorr message
"""
self.states = states
self.begin_state = begin_state
self.end_state = end_state
self.alphabet = alphabet
#initialization of transition probabilities matrix
a_lk = dict()
for state1 in self.states:
a_lk[state1] = dict()
for state2 in states:
a_lk[state1][state2] = None
#set to zero transition probabilities to begin_state and from end_state
for state1 in self.states:
a_lk[state1][begin_state] = 0
for state2 in states:
a_lk[self.end_state][state2] = 0
#add all known transition probabilities
for state1, state2, a in known_a_lk:
a_lk[state1][state2] = a
#try to count unknown transition probabilities
for state1 in filter(lambda x: x != self.end_state, states):
state1_out = sum(filter(None, a_lk[state1].values()))
if state1_out == 1:
for state2 in a_lk[state1].keys():
if a_lk[state1][state2] is None:
a_lk[state1][state2] = 0
else:
if sum(a is None for a in a_lk[state1].values()) == 1:
for state2 in a_lk[state1].keys():
if a_lk[state1][state2] == None:
a_lk[state1][state2] = 1 - state1_out
else:
raise Exception(f"Sum of out transition probabilities for {state1} is not 1 and there is more than 1 undefined transition")
#if everything is ok, save transition probabilities matrix
self.a_lk = a_lk
#initialization of emission probabilities matrix
e_k = dict()
for state in states:
e_k[state] = dict()
for letter in self.alphabet:
e_k[state][letter] = None
#set to zero emission probabilities in begin_state and end_state
for letter in self.alphabet:
e_k[self.begin_state][letter] = 0
e_k[self.end_state][letter] = 0
#add all known emission probabilities
for state1, letter, e in known_e_k:
e_k[state1][letter] = e
#try to count unknown emission probabilities
for state in filter(lambda x: (x != self.begin_state and x != self.end_state), states):
p_ = sum(filter(None, e_k[state].values()))
if p_ == 1:
for letter in e_k[state].keys():
if e_k[state][letter] is None:
e_k[state][letter] = 0
else:
if sum(e is None for e in e_k[state].values()) == 1:
for letter in e_k[state].keys():
if e_k[state][letter] == None:
e_k[state][letter] = 1 - p_
else:
raise Exception(f"Sum of emission probabilities for {state} is not 1 and there is more than 1 undefined probability")
#if everything is ok, save emission probabilities matrix
self.e_k = e_k
def f_b_algorithm(self, seq, eps=1e-6):
"""
Realisation of Forward-Backward algorithm.
Parameters:
seq (str or other iterable): sequence of letters that might be described by current HMM
eps (float, default : 1e-6): epsilon for algorithm's convergence cheking
Attributes:
f_ks (list of dict): list of forvard probabilities, where
f_ks[0][begin_state] = 1
f_ks[i][state] = P( state | seq[:i] )
f_ks[-1][end_state] = P(seq)
b_ks (list of dict): list of backward probabilities, where
b_ks[-1][end_state] = 1
b_ks[i][state] = P( seq[i:] | state)
b_ks[0][begin_state] = P(seq)
p_ks (list of dict): list of posterior probabilities, where
p_ks[-1][end_state] = 1
p_ks[i][state] = P( state | seq)
p_ks[0][start_state] = 1
p_seq (float): P(seq) = f_ks[-1][end_state] = b_ks[0][begin_state]
"""
self.seq = seq
#initialization of f_B(0) = 1
self.f_ks = list()
f_k = dict()
for state in self.states:
f_k[state] = 0
f_k[self.begin_state] = 1
self.f_ks.append(f_k)
#computation of f_k(i)
for i in range(len(self.seq)):
f_k = dict()
for state in self.states:
f_k[state] = sum([self.f_ks[i][pre_state] * self.a_lk[pre_state][state] * self.e_k[state][self.seq[i]] for pre_state in self.states])
self.f_ks.append(f_k)
f_k = dict()
#computation of f_E(L)
for state in self.states:
f_k[state] = 0
f_k[self.end_state] = sum([self.f_ks[-1][pre_state] * self.a_lk[pre_state][self.end_state] for pre_state in self.states])
self.f_ks.append(f_k)
#initialization of b_E(L) = 1
self.b_ks = list()
b_k = dict()
for state in self.states:
b_k[state] = 0
b_k[self.end_state] = 1
self.b_ks.append(b_k)
b_k = dict()
#initialization of b_k(L-1)
for state in self.states:
b_k[state] = self.b_ks[0][end_state] * self.a_lk[state][self.end_state]
self.b_ks.append(b_k)
#computation of b_k(i)
for i in range(1, len(self.seq)):
b_k = dict()
for state in self.states:
b_k[state] = sum([self.b_ks[i][post_state] * self.a_lk[state][post_state] * self.e_k[post_state][self.seq[-i]] for post_state in self.states])
self.b_ks.append(b_k)
self.b_ks.append(b_k)
self.b_ks.reverse()
#cheking of computed values
assert self.f_ks[-1][self.end_state] - self.b_ks[0][self.begin_state] < eps, "Something is wrong and f_E(L) != b_B(0)"
#liklihood
self.p_seq = self.f_ks[-1][self.end_state]
#compute posterior probabilities
self.p_ks = list()
for f_k, b_k in zip(self.f_ks, self.b_ks):
p_k = dict()
for state in states:
p_k[state] = f_k[state] * b_k[state] / self.p_seq
self.p_ks.append(p_k)
def f_b_plot(self, states_=None, labels_=None, figsize=None):
"""
Plot the results of Forward-Backward algorithm.
Parameters:
states_ (list of lists):
labels_ (list):
figsize : (float, float), default: rcParams["figure.figsize"] (default: [6.4, 4.8]) : Width, height in inches.
states_ is a list of groups of statetes, that should be ploted as one state
for example, if Site1 contains [state1, state2, ..., statek] and Site2 contains [st1, st2, ..., stk],
you can use states_ = [[state1, state2, ..., statek], [st1, st2, ..., stk]], labels_=[Site1, Site2]
by default each state of HMM is ploted as an individual state
"""
if states_ is None: states_ = [[state] for state in self.states]
if labels_ is None: labels_ = self.states
assert len(states_) == len(labels_), "Number of states and number of labels must be equal"
coords = {"position": [i for i in range(len(self.p_ks))]}
for label in labels_:
coords[label] = list()
for p_k in self.p_ks:
for state_, label in zip(states_, labels_):
coords[label].append(sum([p_k[state] for state in state_]))
plt.figure(figsize=figsize)
plt.plot("position", *labels_, data=coords)
plt.legend(labels_)
plt.xticks(ticks=coords["position"], labels=list("_"+self.seq+" "))
def viterbi(self, seq):
"""
Realisation of Viterbi's algorithm.
Parameters:
seq (str or other iterable): sequence of letters that might be described by current HMM
Attributes:
v_ks (list of dict): list of max liklihood of path(begin_state --> state)
pi_ks (list of dict): list of previous statetes that leads to max liklihood of path(begin_state --> state)
path (list): list of states with the highest liklihood of path(begin_state --> end_state)
"""
self.seq = seq
#initialization of v_B= L(begin_state)
self.v_ks = list()
self.pi_ks = list()
v_k = dict()
for state in self.states:
v_k[state] = 0
v_k[self.begin_state] = 1
self.v_ks.append(v_k)
#compute v_k(i) = L(begin_state --> k_state, seq[i])
for letter in self.seq:
v_k = dict()
pi_k = dict()
for state in self.states:
v_k[state] = 0
for pre_state in self.states:
new_v_k = self.v_ks[-1][pre_state] * self.a_lk[pre_state][state] * self.e_k[state][letter]
if new_v_k > v_k[state]:
v_k[state] = new_v_k
pi_k[state] = pre_state
self.v_ks.append(v_k)
self.pi_ks.append(pi_k)
#compute v_E = L(begin_state --> end_state)
v_k = dict()
pi_k = dict()
for state in self.states:
v_k[state] = 0
for pre_state in self.states:
new_v_k = self.v_ks[-1][pre_state] * self.a_lk[pre_state][self.end_state]
if new_v_k > v_k[self.end_state]:
v_k[self.end_state] = new_v_k
pi_k[self.end_state] = pre_state
self.v_ks.append(v_k)
self.pi_ks.append(pi_k)
#find path begin_state --> end_state
inv_pi_ks = reversed(self.pi_ks)
self.path = [end_state]
for inv_pi_k in inv_pi_ks:
self.path.append(inv_pi_k[self.path[-1]])
self.path.reverse()
def find_site(self, site):
"""
Can be used only after calling HMM.viterbi() method.
Parameters:
site (list): list of statets that are in site
Returns:
starts (list): list of indices where site starts in sequence
ends (list): list of indices where site ends in sequence
"""
starts = list()
ends = list()
curr = None
for i, state in enumerate(self.path, -1):
if state in site:
if curr == "Bg":
starts.append(i)
curr = "Site"
else:
if curr == "Site":
ends.append(i)
curr = "Bg"
assert len(starts) == len(ends), "Something wrong, amount of site's starts and ends are not equal"
return starts, ends
def generate(self):
"""
Generate sequence using current HMM.
Returns:
seq (str): generated sequence
"""
seq = ""
state = np.random.choice(list(self.a_lk[self.begin_state].keys()), p=list(self.a_lk[self.begin_state].values()))
while state != self.end_state:
letter = np.random.choice(list(self.e_k[state].keys()), p=list(self.e_k[state].values()))
seq += letter
state = np.random.choice(list(self.a_lk[state].keys()), p=list(self.a_lk[state].values()))
return seq
seq = "gataggattatcattcataagtttcagagcaatgtccttattctggaacttggatttatggctcttttggtttaatttcgcctgattcttgatctcctttagcttctcgacgtgggcctttttcttgccatatggatccgctgcacggtcctgttccctagcatgtacgtgagcgtatttccttttaaaccacgacgctttgtcttcattcaacgtttcccattgtttttttctactattgctttgctgtgggaaaaacttatcgaaagatgacgactttttcttaattctcgttttaagagcttggtgagcgctaggagtcactgccag"
states = ["B", "Bg", "E", "site1", "site2", "site3", "site4"]
begin_state = "B"
end_state = "E"
known_a_lk = [("Bg", "E", 0.01),
("site4", "E", 0.01),
("Bg", "site1", 0.01),
("Bg", "site2", 0),
("Bg", "site3", 0),
("Bg", "site4", 0),
("site1", "site2", 1),
("site2", "site3", 1),
("site3", "site4", 1),
("site4", "site1", 0),
("site4", "site2", 0),
("site4", "site3", 0),
("site4", "site4", 0),
("B", "site2", 0),
("B", "site3", 0),
("B", "site4", 0),
("B", "E", 0),
("B", "site1", 0.01)]
known_e_k = [("site1", "a", 1),
("site2", "c", 1),
("site3", "g", 1),
("site4", "t", 1),
("Bg", "a", 0.25),
("Bg", "c", 0.25),
("Bg", "g", 0.25),
("Bg", "t", 0.25)]
#searching sites with Viterbi's Algorithm
model = HMM(states, known_a_lk, known_e_k)
model.viterbi(seq)
starts, ends = model.find_site(["site1", "site2", "site3", "site4"])
for s, e in zip(starts, ends):
print(f"found site from {s} to {e} : {seq[s:e]}")
else:
print(f"Total number of sites: {len(starts)}")
found site from 109 to 113 : acgt found site from 166 to 170 : acgt found site from 212 to 216 : acgt Total number of sites: 3
#searching sites with Forward-Backward Algorithm
model = HMM(states, known_a_lk, known_e_k)
model.f_b_algorithm(seq)
model.f_b_plot(states_ = [["Bg"],
["site1", "site2", "site3", "site4"]],
labels_ = ["Background",
"Site"],
figsize=[32, 2])
#all three sites are found in test sequence
np.random.seed(666) #with another seed there might be no site in generated sequence
generated_seq = model.generate()
model.f_b_algorithm(generated_seq)
model.f_b_plot(states_ = [["Bg"],
["site1", "site2", "site3", "site4"]],
labels_ = ["Background",
"Site"],
figsize=[32, 2])
#generated sequence contains one site
known_a_lk = [("Bg", "E", 0.01),
("site4", "E", 0.01),
("Bg", "site1", 0.001), #changed trasition probability
("Bg", "site2", 0),
("Bg", "site3", 0),
("Bg", "site4", 0),
("site1", "site2", 1),
("site2", "site3", 1),
("site3", "site4", 1),
("site4", "site1", 0),
("site4", "site2", 0),
("site4", "site3", 0),
("site4", "site4", 0),
("B", "site2", 0),
("B", "site3", 0),
("B", "site4", 0),
("B", "E", 0),
("B", "site1", 0.01)]
model = HMM(states, known_a_lk, known_e_k)
model.viterbi(seq)
starts, ends = model.find_site(["site1", "site2", "site3", "site4"])
for s, e in zip(starts, ends):
print(f"found site from {s} to {e} : {seq[s:e]}")
else:
print(f"Total number of sites: {len(starts)}")
Total number of sites: 0
#searching sites with Forward-Backward Algorithm
model = HMM(states, known_a_lk, known_e_k)
model.f_b_algorithm(seq)
model.f_b_plot(states_ = [["Bg"],
["site1", "site2", "site3", "site4"]],
labels_ = ["Background",
"Site"],
figsize=[32, 2])
#all three sites are found in test sequence, but they are determind as background. Since P(Bg --> site) = 0.001, presence of site in sequence is almost impossible event
np.random.seed(42) #with another seed there might be no site in generated sequence
generated_seq = model.generate()
model.f_b_algorithm(generated_seq)
model.f_b_plot(states_ = [["Bg"],
["site1", "site2", "site3", "site4"]],
labels_ = ["Background",
"Site"],
figsize=[32, 2])
#generated sequence contains one site