回顾一阶HMM

在一阶HMM中,下一个隐藏状态是当前隐藏状态的条件概率:
$$ P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k, \cdots ) \approx P(q_{t+1} = S_j | q_t = S_i) $$

即转移矩阵:
$$ A = [a_{ij}] \quad \text{where} \quad a_{ij} \equiv P(q_{t+1} = S_j | q_t = S_i) $$

且特定时刻观察状态只和当前隐藏状态有关。
$$ b_j(m) \equiv P(O_t = \nu_m | q_t = S_j) $$

即观测矩阵:
$$ B = [b_j(m)] \quad \text{where} \quad b_j(m) \equiv P(O_t = \nu_m | q_t = S_j) $$

二阶HMM

对于字序列标记分词,当前分词标记和上一个字的分词标记相关,这是二元组 Bigram,但分析样本发现切分位置并不一定只和上一个字相关,可能会有更长远的关系,比如假设当前字标记和之前两个字标记有关,那么就成为了三元组。即 $ Trigram $:

$$ P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k, \cdots ) \approx P(q_{t+1} = S_j | q_t = S_i, q_{t-1} = S_k) $$

同时还假设当前观测到的字符除和当前分词标记(隐藏状态)相关外,也与上一个隐藏状态相关:

$$ b_{ij}(m) \equiv P(O_t = \nu_m | q_t = S_j, q_{t-1} = S_i) $$

对应转移矩阵和观测矩阵也改动为:

$$ \begin{align} A &= [a_{ijk}] \quad \text{where} \quad a_{ijk} \equiv P(q_{t+1} = S_k | q_t = S_j , q_{t-1} = S_i ) \\ B &= [b_{ij}(m)] \quad \text{where} \quad b_{ij}(m) \equiv P(O_t = \nu_m | q_t = S_j , q_{t-1} = S_i) \end{align} $$

二阶Viterbi

在每次的计算在中,要考虑先后两个状态,因此viterbi对应更改:

$$ \begin{align} \delta_t(i, j) & \equiv \max_{{q_1}{q_2}{\cdots}{q_{t-2}}} P( {q_1}{q_2}{\cdots}{q_{t-2},q_{t-1}=S_i, q_t = S_j, O_1 \cdots O_t | \lambda } ) \\ \psi_t(i, j) & \equiv \arg\max_{{q_1}{q_2}{\cdots}{q_{t-2}}} P( {q_1}{q_2}{\cdots}{q_{t-2},q_{t-1}=S_i, q_t = S_j, O_1 \cdots O_t | \lambda } ) \end{align} $$

  1. 初始化
    此时并不满足二阶条件。 继续使用一阶HMM的初始/转移矩阵,但要第一步时需忽略参数 $ i $,且不考虑改进后的观测矩阵。当然其实也可以认为$ i $事件发生概率在这里永远是1:

    $$ \begin{align} \text{when} \quad t=1 \text{:} \qquad & \\ \delta_1(i, j) & = \pi_j b_j(O_1) \\ \psi_1(i,j) & = 0 \\ \text{when} \quad t=2 \text{:} \qquad & \\ \delta_2(i, j) & = \delta_1( , i)a_{ij} \cdot b_{ij}(O_2) \\ \psi_2(i,j) & = 0 \\ \end{align} $$

    恩,似乎有点麻烦,直接从t=2作为初始化,合并到一起好了。从t=3开始计算$ \psi $。

    $$ \begin{align} \delta_2(i, j) & = \pi_i b_i(O_1) \cdot a_{ij} \cdot b_{ij}(O_2) \\ \psi_2(i,j) & = 0 \end{align} $$

  2. 递归

    $$ \begin{align} \delta_t(j, k) & = \max_i \delta_{t-1}(i, j) \cdot a_{ijk} \cdot b_{jk}(O_t) \\ \psi_t(j, k) & = \arg\max_i \delta_{t-1}(i, j) \cdot a_{ijk} \end{align} $$

  3. 终止

    $$ \begin{align} p^* & = \max_{i,j} \delta_T(i, j) \\ q^*_{T-1} & = \arg_i\max_{i,j} \delta_T(i, j) \\ q^*_{T} & = \arg_j\max_{i,j} \delta_T(i, j) \\ \end{align} $$

  4. 回朔状态序列路径
    $$ q^*_t = \psi_{t+2}(q^*_{t+1}, q^*_{t+2}), t = T-2, T-3, \cdots, 1 $$

平滑

在二阶HMM算法中,可能无法从样本中统计到所有的 $ a_{jk}, a_{ijk},b_{ij}(m),b_j{m} $ 参数。因此和HMM一样可能存在零概率状态序列,为解决这个问题需要对转移矩阵和观测矩阵做平滑,在计算时估计新样本的概率。

平滑转移矩阵

Brants, Thorsten. "TnT: a statistical part-of-speech tagger. 提出的平滑方法。

$$ p(t_3 | t_2, t_1) = \lambda_1 \hat{p}(t_3) + \lambda_2 \hat{p}(t_3 | t_2) + \lambda_3 \hat{p}(t_3 | t_2, t_1) $$

系数 $ \lambda_1 + \lambda_2 + \lambda_3 = 1 $ ,于是$ p $表达为它们各自概率的线性组合。以下是系数计算方法

$$ \begin{align} & \text{set} \quad \lambda_1 + \lambda_2 + \lambda_3 = 0 \\ & \text{foreach trigram} \quad t_1, t_2, t_3 \quad \text{with} \quad f(t_1,t_2,t_3) > 0 \\ & \qquad \text{depending on the maximum of the following three values:} \\ & \qquad \qquad \text{case} \quad \frac{f(t_1,t_2,t_3)-1}{f(t_1,t_2)-1} \text{:} \qquad \text{increment} \quad \lambda_3 \quad \text{by} \quad f(t_1,t_2,t_3) \\ & \qquad \qquad \text{case} \quad \frac{f(t_2,t_3)-1}{f(t_2)-1} \text{:} \qquad \text{increment} \quad \lambda_2 \quad \text{by} \quad f(t_1,t_2,t_3) \\ & \qquad \qquad \text{case} \quad \frac{f(t_3)-1}{N-1} \text{:} \qquad \text{increment} \quad \lambda_1 \quad \text{by} \quad f(t_1,t_2,t_3) \\ & \qquad \text{end} \\ & \text{end} \\ & \text{normalize} \quad \lambda_1,\lambda_2,\lambda_3 \end{align} $$

计算时如果其中某个式子分母为0,则让计算结果为0。
其实算法就是在累计三元,二元,一元在每个三元组下的数量,最后用这个数量归一得到比例。这就将三元组概率表达为线性插值算法。分子分母减1是考虑到可能有些样本没有观察到。

其实在这里用梯度下降训练系数可能效果会更好些,或改为总数累计比例, 实际项目中需要尝试多种系数计算方法对比平滑效果。考虑到字序列分词只有BMES三种标签,实际上样本稍微多点就不需要平滑。

平滑观察概率矩阵

零概率问题不只是在转移概率中存在,因为我们在观察概率中也考虑了连续两个隐藏状态,这里也可能存在两个隐藏序列标注观察到某字符是零概率(考虑到汉字可有8万个之多,难免有样本中未统计到的状况)。在这里的平滑方式模仿转移概率平滑,但是考虑到观测字符在样本中并未出现过的情况,此时退回到其他的平滑方式,例如最简单的+1平滑。

代码实现和效果

并未优化代码结构和性能,只验证算法。因转移矩阵并无采样缺失,所以未做转移矩阵平滑,只对观察矩阵做了$ TnT $平滑, $ +1 $平滑。

# -*- coding: utf-8 -*-
import sys, re, math
import numpy as np

sys.argv.append('./gold/pku_training_words.utf8')
sys.argv.append('./training/pku_training.utf8')
sys.argv.append('./testing/pku_test.utf8')

assert len(sys.argv) == 4

training_words_filename = sys.argv[1]
training_filename = sys.argv[2]
test_filename = sys.argv[3]

with open(training_words_filename, 'rt', encoding='utf8') as f:
    training_words = f.readlines()

with open(training_filename, 'rt', encoding='utf8') as f:
    training = f.readlines()
    
with open(test_filename, 'rt', encoding='utf8') as f:
    test = f.readlines()

# training += training_words
# word tag by char
hidden_state = ['B','M','E','S']
A, B, P = {}, {}, {}
_N = 0
_O = {}
for line in training:
    #print( line )
    prev_a = None
    for w, word in enumerate(re.split(r'\s{2}', line)):
        I = len(word)
        _N += I
        for i, c in enumerate(word):
            _O[c] = _O.get(c, 0) + 1
            if I == 1:
                a = 'S'
            else:
                if i == 0:
                    a = 'B'
                elif i == I-1:
                    a = 'E'
                else:
                    a = 'M'
            # print(w, i, c, a)
            if prev_a is None: # calculate Initial state Number
                if a not in P: P[a] = 0
                P[a] += 1
            else: # calculate State transition Number
                if prev_a not in A: A[prev_a] = {}
                if a not in A[prev_a]: A[prev_a][a] = 0
                A[prev_a][a] += 1
            prev_a = a
            # calculate Observation Number
            if a not in B: B[a] = {}
            if c not in B[a]: B[a][c] = 0
            B[a][c] += 1
_B = B.copy()            
# calculate probability
for k, v in A.items():
    total = sum(v.values())
    A[k] = dict([(x, math.log(y / total)) for x, y in v.items()])
for k, v in B.items():
    # plus 1 smooth
    total = sum(v.values())
    V = len(v.values())
    B[k] = dict([(x, math.log((y+1.0) / (total+V))) for x, y in v.items()])
    # plus 1 smooth
    B[k]['<UNK>'] = math.log(1.0 / (total+V))
minlog = math.log( sys.float_info.min )
total = sum(P.values())
for k, v in P.items():
    P[k] = math.log( v / total )

A2,B2 = {}, {}
for line in training:
    temptags = []
    tempsent = []
    for w, word in enumerate(re.split(r'\s{2}', line)):
        I = len(word)
        for i, c in enumerate(word):
            if I == 1:
                a = 'S'
            else:
                if i == 0:
                    a = 'B'
                elif i == I-1:
                    a = 'E'
                else:
                    a = 'M'
            temptags.append(a)
            tempsent.append(c)
            if len(temptags) >= 3:
                if temptags[-3] not in A2: A2[temptags[-3]] = {}
                if temptags[-2] not in A2[temptags[-3]]: A2[temptags[-3]][temptags[-2]] = {}
                if temptags[-1] not in A2[temptags[-3]][temptags[-2]]: A2[temptags[-3]][temptags[-2]][temptags[-1]] = 0
                A2[temptags[-3]][temptags[-2]][temptags[-1]] += 1
            if len(temptags) >= 2:
                if temptags[-2] not in B2: B2[temptags[-2]] = {}
                if temptags[-1] not in B2[temptags[-2]]: B2[temptags[-2]][temptags[-1]] = {}
                if tempsent[-1] not in B2[temptags[-2]][temptags[-1]]: B2[temptags[-2]][temptags[-1]][tempsent[-1]] = 0
                B2[temptags[-2]][temptags[-1]][tempsent[-1]] += 1
    #print( temptags, tempsent )
    #break

# calculate A2 log probabilitis
for i in A2:
    for j in A2[i]:
        total = sum([A2[i][j][k] for k in A2[i][j]])
        for k in A2[i][j]:
            A2[i][j][k] = math.log( A2[i][j][k] / total )
_A = np.array( [0,0,0] )
for i in B2:
    for j in B2[i]:
        total = sum([B2[i][j][k] for k in B2[i][j]])
        V = len( B2[i][j] )
        for k in B2[i][j]:
            # TnT smooth
                        # 计算 TnT 平滑系数
            a3 = ((B2[i][j][k] - 1.) / (total - 1.)) if total > 1 else 0
            a2 = (_B[j][k] - 1.) / ( sum([_B[j][n] for n in _B[j]]) - 1. )
            a1 = (_O[k] - 1.) / (_N - 1.)
            _A[np.argmax([a1,a2,a3])] += B2[i][j][k]
                        # 计算二阶观察矩阵概率(+1平滑)
            B2[i][j][k] = math.log( ( B2[i][j][k] + 1 ) / (total + V) )
        B2[i][j]['<UNK>'] = math.log( 1.0 / (total + V) )

# 计算 TnT 平滑系数
_A = _A / _A.sum()

# 计算+1平滑字频
for o in _O:
    _O[o] = math.log( (_O[o] + 1.0) / (_N + len(_O)) )
_O['<UNK>'] = math.log( 1.0 / (_N + len(_O)) )

def viterbi2(observation):
    def _A2(i,j,k):
        key = ''.join([i,j,k])
        if key.find('BB') >=0 : return minlog
        if key.find('MB') >=0 : return minlog
        if key.find('SM') >=0 : return minlog
        if key.find('EM') >=0 : return minlog
        if key.find('EE') >=0 : return minlog
        if key.find('SE') >=0 : return minlog
        if key.find('BS') >=0 : return minlog
        if key.find('MS') >=0 : return minlog
        try:
            return A2[i][j][k]
        except Exception as e:
            print( i, j, k)
            raise e
            
    def _B2(i,j,o):
        #return B[j].get(o, B[j]['<UNK>'])
        key = ''.join([i,j])
        if key == 'BB': return minlog
        if key == 'MB': return minlog
        if key == 'SM': return minlog
        if key == 'EM': return minlog
        if key == 'EE': return minlog
        if key == 'SE': return minlog
        if key == 'BS': return minlog
        if key == 'MS': return minlog
        #if o not in B2[i][j]:
        #    return B[j].get(o, B[j]['<UNK>'])
        #return B2[i][j].get(o, B2[i][j]['<UNK>'])
        return _A[0] * _O.get(o, _O['<UNK>']) + _A[1] * B[j].get(o, B[j]['<UNK>']) + _A[2] * B2[i][j].get(o, B2[i][j]['<UNK>'])
    
    state = ['B','M','E','S']
    T = len(observation)
    delta = [None] * (T + 1)
    psi = [None] * (T + 2)
    for i in state:
        if delta[1] is None: delta[1] = {}
        if i not in delta[1]: delta[1][i] = {}
        for j in state:
            delta[1][i][j] = P.get(i, minlog) + B[i].get(observation[0], B[i]['<UNK>']) + A[i].get(j, minlog) + _B2(i, j, observation[1])
    for t in range(2, T):
        Ot = observation[t]
        if delta[t] is None: delta[t] = {}
        if psi[t] is None: psi[t] = {}
        for j in state:
            if j not in delta[t]: delta[t][j] = {}
            if j not in psi[t]: psi[t][j] = {}
            for k in state:
                delta[t][j][k], psi[t][j][k] = max( [ (delta[t-1][i][j] + _A2(i,j,k) + _B2(j,k,Ot), i) for i in state] )
    delta[T], (psi[T], psi[T+1]) = max( [ (delta[T-1][i][j], (i, j)) for i in state for j in state] )
    q = [None] * (T+2)
    q[T+1] = psi[T+1]
    q[T] = psi[T]
    for t in range(T-1, 1, -1):
        q[t] = psi[t][q[t+1]][q[t+2]]
    return q[2:]

for sent in test:
    if len(sent) < 2:
        print(sent, sep='', end='')
        continue
    sequence = viterbi2( list(sent) )
    segment = []
    for char, tag in zip(sent, sequence):
        if tag == 'B':
            segment.append(char)
        elif tag == 'M':
            segment[-1] += char
        elif tag == 'E':
            segment[-1] += char
        elif tag == 'S':
            segment.append(char)
        else:
            raise Exception()
    print('  '.join(segment), sep='', end='')
    #break 

训练和评估使用pku语料:

=== SUMMARY:
=== TOTAL INSERTIONS:   3921
=== TOTAL DELETIONS:    7163
=== TOTAL SUBSTITUTIONS:    13864
=== TOTAL NCHANGE:  24948
=== TOTAL TRUE WORD COUNT:  104372
=== TOTAL TEST WORD COUNT:  101130
=== TOTAL TRUE WORDS RECALL:    0.799
=== TOTAL TEST WORDS PRECISION: 0.824
=== F MEASURE:  0.811
=== OOV Rate:   0.058
=== OOV Recall Rate:    0.360
=== IV Recall Rate: 0.825

pku测试算法分词性能对比

algorithmPRFOOVOOV RecallIV Recall
maximum matching0.8360.9040.8690.0580.0590.956
maximum probability0.8590.9190.8880.0580.0850.970
HMM0.8040.7890.7960.0580.3640.815
Full Second Order HMM0.8240.7990.8110.0580.3600.825

参考资料

[1] Brants, Thorsten. "TnT: a statistical part-of-speech tagger." Proceedings of the sixth conference on Applied natural language processing. Association for Computational Linguistics, 2000.
[2] Scott M. Thede and Mary P. Harper, A Second-Order Hidden Markov Model for Part-of-Speech Tagging

Tag:分词, 人工智能, 自然语言处理, NLP

Only one comment.

  1. inso inso

    谢谢分享!

Add a new comment.