Muravyov George¶

In [ ]:
import numpy as np
import matplotlib.pyplot as plt

HMM.png

In [ ]:
class HMM:
  """
  Class representing Hidden Markov Model.

   Attributes:
      states (list): List of posible states, including begin_state and end_state
      a_lk (dict of dict): transition probability matrix,
                           where a_lk[state_l][state_k] is float
      e_k (dict of dict): emission probability matrix,
                          where e_k[state][letter] is float
      begin_state : begin_state
      end_state : end_state
      alphabet : alphabet
      seq (str or other iter): last analysed sequence
  """

  def __init__(self, states, known_a_lk, known_e_k, begin_state="B", end_state="E", alphabet="acgt"):
    """
    Initializes a HMM.

    Parameters:
        states (list): List of posible states, including begin_state and end_state
        known_a_lk (list): list of tuples (state_l, state_k, a_lk),
                           where a_lk (float) is transition probability for state_l --> state_k
        known_e_k (list): list of tuples (state_k, letter, e_k),
                          where e_k (float) is emission probability for letter in state_k
        begin_state (default : "B")
        end_state (default : "E")
        alphabet (default : "acgt")

    Note that some not defined a_lk or e_k might be computed. If it cannot be done unambiguously, you will get erorr message
    """
    self.states = states
    self.begin_state = begin_state
    self.end_state = end_state
    self.alphabet = alphabet

    #initialization of transition probabilities matrix
    a_lk = dict()
    for state1 in self.states:
      a_lk[state1] = dict()
      for state2 in states:
        a_lk[state1][state2] = None

    #set to zero transition probabilities to begin_state and from end_state
    for state1 in self.states:
      a_lk[state1][begin_state] = 0
    for state2 in states:
      a_lk[self.end_state][state2] = 0

    #add all known transition probabilities
    for state1, state2, a in known_a_lk:
      a_lk[state1][state2] = a

    #try to count unknown transition probabilities
    for state1 in filter(lambda x: x != self.end_state, states):
      state1_out = sum(filter(None, a_lk[state1].values()))
      if state1_out == 1:
        for state2 in a_lk[state1].keys():
          if a_lk[state1][state2] is None:
            a_lk[state1][state2] = 0
      else:
        if sum(a is None for a in a_lk[state1].values()) == 1:
          for state2 in a_lk[state1].keys():
            if a_lk[state1][state2] == None:
              a_lk[state1][state2] = 1 - state1_out
        else:
          raise Exception(f"Sum of out transition probabilities for {state1} is not 1 and there is more than 1 undefined transition")

    #if everything is ok, save transition probabilities matrix
    self.a_lk = a_lk

    #initialization of emission probabilities matrix
    e_k = dict()
    for state in states:
      e_k[state] = dict()
      for letter in self.alphabet:
        e_k[state][letter] = None

    #set to zero emission probabilities in begin_state and end_state
    for letter in self.alphabet:
      e_k[self.begin_state][letter] = 0
      e_k[self.end_state][letter] = 0

    #add all known emission probabilities
    for state1, letter, e in known_e_k:
      e_k[state1][letter] = e

    #try to count unknown emission probabilities
    for state in filter(lambda x: (x != self.begin_state and x != self.end_state), states):
      p_ = sum(filter(None, e_k[state].values()))
      if p_ == 1:
        for letter in e_k[state].keys():
          if e_k[state][letter] is None:
            e_k[state][letter] = 0
      else:
        if sum(e is None for e in e_k[state].values()) == 1:
          for letter in e_k[state].keys():
            if e_k[state][letter] == None:
              e_k[state][letter] = 1 - p_
        else:
          raise Exception(f"Sum of emission probabilities for {state} is not 1 and there is more than 1 undefined probability")

    #if everything is ok, save emission probabilities matrix
    self.e_k = e_k

  def f_b_algorithm(self, seq, eps=1e-6):
    """
    Realisation of Forward-Backward algorithm.

    Parameters:
        seq (str or other iterable): sequence of letters that might be described by current HMM
        eps (float, default : 1e-6): epsilon for algorithm's convergence cheking

    Attributes:
        f_ks (list of dict): list of forvard probabilities, where
                             f_ks[0][begin_state] = 1
                             f_ks[i][state] = P( state | seq[:i] )
                             f_ks[-1][end_state] = P(seq)
        b_ks (list of dict): list of backward probabilities, where
                             b_ks[-1][end_state] = 1
                             b_ks[i][state] = P( seq[i:] | state)
                             b_ks[0][begin_state] = P(seq)
        p_ks (list of dict): list of posterior probabilities, where
                             p_ks[-1][end_state] = 1
                             p_ks[i][state] = P( state | seq)
                             p_ks[0][start_state] = 1
        p_seq (float): P(seq) = f_ks[-1][end_state] = b_ks[0][begin_state]
    """
    self.seq = seq

    #initialization of f_B(0) = 1
    self.f_ks = list()
    f_k = dict()
    for state in self.states:
      f_k[state] = 0
    f_k[self.begin_state] = 1
    self.f_ks.append(f_k)

    #computation of f_k(i)
    for i in range(len(self.seq)):
      f_k = dict()
      for state in self.states:
        f_k[state] = sum([self.f_ks[i][pre_state] * self.a_lk[pre_state][state] * self.e_k[state][self.seq[i]] for pre_state in self.states])
      self.f_ks.append(f_k)
    f_k = dict()

    #computation of f_E(L)
    for state in self.states:
      f_k[state] = 0
    f_k[self.end_state] = sum([self.f_ks[-1][pre_state] * self.a_lk[pre_state][self.end_state] for pre_state in self.states])
    self.f_ks.append(f_k)

    #initialization of b_E(L) = 1
    self.b_ks = list()
    b_k = dict()
    for state in self.states:
      b_k[state] = 0
    b_k[self.end_state] = 1
    self.b_ks.append(b_k)
    b_k = dict()

    #initialization of b_k(L-1)
    for state in self.states:
      b_k[state] = self.b_ks[0][end_state] * self.a_lk[state][self.end_state]
    self.b_ks.append(b_k)

    #computation of b_k(i)
    for i in range(1, len(self.seq)):
      b_k = dict()
      for state in self.states:
        b_k[state] = sum([self.b_ks[i][post_state] * self.a_lk[state][post_state] * self.e_k[post_state][self.seq[-i]] for post_state in self.states])
      self.b_ks.append(b_k)
    self.b_ks.append(b_k)
    self.b_ks.reverse()

    #cheking of computed values
    assert self.f_ks[-1][self.end_state] - self.b_ks[0][self.begin_state] < eps, "Something is wrong and f_E(L) != b_B(0)"

    #liklihood
    self.p_seq = self.f_ks[-1][self.end_state]

    #compute posterior probabilities
    self.p_ks = list()
    for f_k, b_k in zip(self.f_ks, self.b_ks):
      p_k = dict()
      for state in states:
        p_k[state] = f_k[state] * b_k[state] / self.p_seq
      self.p_ks.append(p_k)

  def f_b_plot(self, states_=None, labels_=None, figsize=None):
    """
    Plot the results of Forward-Backward algorithm.

    Parameters:
        states_ (list of lists):
        labels_ (list):
        figsize : (float, float), default: rcParams["figure.figsize"] (default: [6.4, 4.8]) : Width, height in inches.

    states_ is a list of groups of statetes, that should be ploted as one state
    for example, if Site1 contains [state1, state2, ..., statek] and Site2 contains [st1, st2, ..., stk],
                you can use states_ = [[state1, state2, ..., statek], [st1, st2, ..., stk]], labels_=[Site1, Site2]
    by default each state of HMM is ploted as an individual state
    """

    if states_ is None: states_ = [[state] for state in self.states]
    if labels_ is None: labels_ = self.states
    assert len(states_) == len(labels_), "Number of states and number of labels must be equal"

    coords = {"position": [i for i in range(len(self.p_ks))]}
    for label in labels_:
      coords[label] = list()
    for p_k in self.p_ks:
      for state_, label in zip(states_, labels_):
        coords[label].append(sum([p_k[state] for state in state_]))
    plt.figure(figsize=figsize)
    plt.plot("position", *labels_, data=coords)
    plt.legend(labels_)
    plt.xticks(ticks=coords["position"], labels=list("_"+self.seq+" "))

  def viterbi(self, seq):
    """
    Realisation of Viterbi's algorithm.

    Parameters:
        seq (str or other iterable): sequence of letters that might be described by current HMM

    Attributes:
        v_ks (list of dict): list of max liklihood of path(begin_state --> state)
        pi_ks (list of dict): list of previous statetes that leads to max liklihood of path(begin_state --> state)
        path (list): list of states with the highest liklihood of path(begin_state --> end_state)
    """
    self.seq = seq

    #initialization of v_B= L(begin_state)
    self.v_ks = list()
    self.pi_ks = list()
    v_k = dict()
    for state in self.states:
      v_k[state] = 0
    v_k[self.begin_state] = 1
    self.v_ks.append(v_k)

    #compute v_k(i) = L(begin_state --> k_state, seq[i])
    for letter in self.seq:
      v_k = dict()
      pi_k = dict()
      for state in self.states:
        v_k[state] = 0
        for pre_state in self.states:
          new_v_k = self.v_ks[-1][pre_state] * self.a_lk[pre_state][state] * self.e_k[state][letter]
          if new_v_k > v_k[state]:
            v_k[state] = new_v_k
            pi_k[state] = pre_state
      self.v_ks.append(v_k)
      self.pi_ks.append(pi_k)

    #compute v_E = L(begin_state --> end_state)
    v_k = dict()
    pi_k = dict()
    for state in self.states:
      v_k[state] = 0
    for pre_state in self.states:
        new_v_k = self.v_ks[-1][pre_state] * self.a_lk[pre_state][self.end_state]
        if new_v_k > v_k[self.end_state]:
          v_k[self.end_state] = new_v_k
          pi_k[self.end_state] = pre_state
    self.v_ks.append(v_k)
    self.pi_ks.append(pi_k)

    #find path begin_state --> end_state
    inv_pi_ks = reversed(self.pi_ks)
    self.path = [end_state]
    for inv_pi_k in inv_pi_ks:
      self.path.append(inv_pi_k[self.path[-1]])
    self.path.reverse()

  def find_site(self, site):
    """
    Can be used only after calling HMM.viterbi() method.

    Parameters:
        site (list): list of statets that are in site

    Returns:
        starts (list): list of indices where site starts in sequence
        ends (list): list of indices where site ends in sequence
    """
    starts = list()
    ends = list()
    curr = None
    for i, state in enumerate(self.path, -1):
      if state in site:
        if curr == "Bg":
          starts.append(i)
        curr = "Site"
      else:
        if curr == "Site":
          ends.append(i)
        curr = "Bg"

    assert len(starts) == len(ends), "Something wrong, amount of site's starts and ends are not equal"
    return starts, ends

  def generate(self):
    """
    Generate sequence using current HMM.

    Returns:
        seq (str): generated sequence
    """
    seq = ""
    state = np.random.choice(list(self.a_lk[self.begin_state].keys()), p=list(self.a_lk[self.begin_state].values()))
    while state != self.end_state:
      letter = np.random.choice(list(self.e_k[state].keys()), p=list(self.e_k[state].values()))
      seq += letter
      state = np.random.choice(list(self.a_lk[state].keys()), p=list(self.a_lk[state].values()))
    return seq
In [ ]:
seq = "gataggattatcattcataagtttcagagcaatgtccttattctggaacttggatttatggctcttttggtttaatttcgcctgattcttgatctcctttagcttctcgacgtgggcctttttcttgccatatggatccgctgcacggtcctgttccctagcatgtacgtgagcgtatttccttttaaaccacgacgctttgtcttcattcaacgtttcccattgtttttttctactattgctttgctgtgggaaaaacttatcgaaagatgacgactttttcttaattctcgttttaagagcttggtgagcgctaggagtcactgccag"

Model with a(Bg → site)=0.01¶

In [ ]:
states = ["B", "Bg", "E", "site1", "site2", "site3", "site4"]
begin_state = "B"
end_state = "E"
known_a_lk = [("Bg", "E", 0.01),
              ("site4", "E", 0.01),
              ("Bg", "site1", 0.01),
              ("Bg", "site2", 0),
              ("Bg", "site3", 0),
              ("Bg", "site4", 0),
              ("site1", "site2", 1),
              ("site2", "site3", 1),
              ("site3", "site4", 1),
              ("site4", "site1", 0),
              ("site4", "site2", 0),
              ("site4", "site3", 0),
              ("site4", "site4", 0),
              ("B", "site2", 0),
              ("B", "site3", 0),
              ("B", "site4", 0),
              ("B", "E", 0),
              ("B", "site1", 0.01)]
known_e_k = [("site1", "a", 1),
             ("site2", "c", 1),
             ("site3", "g", 1),
             ("site4", "t", 1),
             ("Bg", "a", 0.25),
             ("Bg", "c", 0.25),
             ("Bg", "g", 0.25),
             ("Bg", "t", 0.25)]
In [ ]:
#searching sites with Viterbi's Algorithm
model = HMM(states, known_a_lk, known_e_k)
model.viterbi(seq)
starts, ends = model.find_site(["site1", "site2", "site3", "site4"])
for s, e in zip(starts, ends):
  print(f"found site from {s} to {e} : {seq[s:e]}")
else:
  print(f"Total number of sites: {len(starts)}")
found site from 109 to 113 : acgt
found site from 166 to 170 : acgt
found site from 212 to 216 : acgt
Total number of sites: 3
In [ ]:
#searching sites with Forward-Backward Algorithm
model = HMM(states, known_a_lk, known_e_k)
model.f_b_algorithm(seq)
model.f_b_plot(states_ = [["Bg"],
                          ["site1", "site2", "site3", "site4"]],
               labels_ = ["Background",
                          "Site"],
               figsize=[32, 2])
#all three sites are found in test sequence
In [ ]:
np.random.seed(666) #with another seed there might be no site in generated sequence
generated_seq = model.generate()
model.f_b_algorithm(generated_seq)
model.f_b_plot(states_ = [["Bg"],
                          ["site1", "site2", "site3", "site4"]],
               labels_ = ["Background",
                          "Site"],
               figsize=[32, 2])
#generated sequence contains one site

Model with a(Bg → site)=0.001¶

In [ ]:
known_a_lk = [("Bg", "E", 0.01),
              ("site4", "E", 0.01),
              ("Bg", "site1", 0.001), #changed trasition probability
              ("Bg", "site2", 0),
              ("Bg", "site3", 0),
              ("Bg", "site4", 0),
              ("site1", "site2", 1),
              ("site2", "site3", 1),
              ("site3", "site4", 1),
              ("site4", "site1", 0),
              ("site4", "site2", 0),
              ("site4", "site3", 0),
              ("site4", "site4", 0),
              ("B", "site2", 0),
              ("B", "site3", 0),
              ("B", "site4", 0),
              ("B", "E", 0),
              ("B", "site1", 0.01)]
In [ ]:
model = HMM(states, known_a_lk, known_e_k)
model.viterbi(seq)
starts, ends = model.find_site(["site1", "site2", "site3", "site4"])
for s, e in zip(starts, ends):
  print(f"found site from {s} to {e} : {seq[s:e]}")
else:
  print(f"Total number of sites: {len(starts)}")
Total number of sites: 0
In [ ]:
#searching sites with Forward-Backward Algorithm
model = HMM(states, known_a_lk, known_e_k)
model.f_b_algorithm(seq)
model.f_b_plot(states_ = [["Bg"],
                          ["site1", "site2", "site3", "site4"]],
               labels_ = ["Background",
                          "Site"],
               figsize=[32, 2])
#all three sites are found in test sequence, but they are determind as background. Since P(Bg --> site) = 0.001, presence of site in sequence is almost impossible event
In [ ]:
np.random.seed(42)  #with another seed there might be no site in generated sequence
generated_seq = model.generate()
model.f_b_algorithm(generated_seq)
model.f_b_plot(states_ = [["Bg"],
                          ["site1", "site2", "site3", "site4"]],
               labels_ = ["Background",
                          "Site"],
               figsize=[32, 2])
#generated sequence contains one site