第七部分 算法问题选编
第二十七章 多线程算法*
片上多处理器和其他共享存储并行计算机的编程都有一个共同之处,就是使用静态线程(static threading) 。静态线程提供了一个“虚拟处理器”的软件抽象,即线程(thread), 这些线程共享一个相同的存储器。每个线程维护一个关联的程序计数器,并能与其他线程相互独立地执行代码。操作系统加载一个线程到处理器上执行,并且在其他的线程需要运行时再把它交换出来。虽然操作系统允许程序员去创建和销毁线程,但这些操作相对较慢。因此,对于大多数应用,线程在计算期间都维待着,这是称之为“静态的“原因。
动态多线程编程
动态多线程(dynamic multithreading) 是一类重要的并发平台。在动态多线程中,程序员只需描述应用中的并行性,不必关心通信协议、负载平衡和静态线程编程的其他各种间题。这种并发平台包含一个调度器,它能自动地进行负载平衡计算,大大减轻了程序员的负担。
动态多线几乎都支持两个特征:
- 嵌套并行:嵌套并行允许派生一个子过程,且允许派生的子过程在计算自已结果的同时调用者继续执行。
- 并行循环:并行循环如同普通的for 循环一样,只是因循环中的迭代可以并发执行而有所不同。
动态多线程模型具有如下几个重要的优点:
- 它是串行编程模型的一个简单扩展。通过在伪代码中加入三个”并发“关键词(parallel 、spawn 和sync) 来描述一个多线程算法。此外,如果从多线程伪代码中删除这些并发关键词,剩下的文本就是原问题的串行伪代码,我们称之为多线程算法的"串行化”。
- 它从理论上提供了一种基于“工作量”和“持续时间”概念的简洁方式来量化并行性。
- 许多涉及嵌套并行的多线程算法都比较自然地服从分治模式。此外,正如串行分治算法通过求解递归式来分析那样,这类多线程算法的分析也是如此。
- 该模型符合并行计算发展的实际情况。越来越多的并发平台支持动态多线程的一种形式或其他形式。
27.1 动态多线程基础
以递归计算斐波那契数为例,开始对动态多线程的探讨
FIB(n)
if n <= 1
return n
else
x=FIB(n-1)
y=FIB(n-2)
return x+y
注意到FIB(n) 中,第 5 行和第 6 行分别调用了两个递归函数 FIB(n-1) 和 FIB(n-2) 这两个递归函数是彼此独立的:它们能以两种次序中的一种被调用,并且调用进行的计算并不会影响到另一个。因此,这两个递归调用可以并行执行。
通过在伪代码中加入并发关键宇spawn 和sync 来表示并行性。下面说明如何使用动态多线程来重写FIB 过程:
P-FIB(n)
if n <= 1
return n
else
x = spawn P-FIB(n—1)
y = P-FIB(n-2)
sync
return x+y
当关键字spawn执行一个过程调用时,就出现了嵌套并行。spawn的语义不同于传统的过程调用,执行调用spawn的过程(即父进程)可以与派生子过程(即子进程)并行执行,而不是像串行执行一样等待子过程计算完。这种情况下,派生子进程计算P一F(一1)的同时,父进程以并行方式可以继续计算第4行中的P-FIB(n—2)o因为P-FIB过程是递归的,这两个子过程调用自己又产生了嵌套并行,它们派生的子过程也是如此,因此产生了一个潜在的、非常大的子计算树,所有计算都能并行执行。
关键字spawn 并不意味着一个过程调用必须要与其派生子过程同时执行,这里只是允许这样。并发关键字表达了计算的逻辑井行(logical parallelism) , 说明了计算中哪些部分可以并行处理。运行时由一个调度器(scheduler) 负责,随着计算的展开决定哪些子计算实际并发执行并将它们分配到可用的处理器上。
直到执行了sync 同步语旬,一个过程才能安全地使用其派生子过程的返回值。关键字sync 表明,过程在执行sync 后面的语句前,必须等到它的所有派生子过程计算完成。
多线程执行的模型
将多线程计算(可由处理器执行的运行时指令的集合,代表多线程程序)看成一个有向无环图G=CV, E), 又称为计算有向无环图(computation dag) ,V 中的顶点代表指令, E 中的边代表指令间的依赖关系,其中 表示指令u 必须在v 之前执行。为了方便起见,如果一段指令中不包含并行控制(即没有spawn 、sync 和来自派生进程的return, 这种return 是显式return 语句,或是程序执行完毕后隐式执行的return ), 可以将它们串成一个链(strand), 这样每个链可以代表一个或者更多的指令。涉及并行控制的指令并不包含在链中,但它们会在有向无环图中被表示出来。
如果一个链有两个后继,则其中一个肯定是派生的;如果一个链有多个前驱,就表明由千有sync 语句而将这些前驱汇合在一起。因此,在一般情况下,集合V 就是链的集合,有向边集合E 是由并行控制产生的链之间的关联。如果G 有一条从链u 到链v 的有向路径,我们称这两个链是(逻辑上)串联的(in series) 。否则,链u 和链v 是(逻辑上)井联的 (in parallel) 。
我们能对一个计算有向无环图中的边进行分类,来表示不同链之间的依赖关系类型。图 27-2 中水平画出的一条连接边 (continuation edge) $\left(u, u^{\prime}\right) $ 将链 u 连接到它的后继 上,它们在同一个 过程实例内。当链 u 派生链 v 时, 有向无环图包含一条派生边 (spawn edge) (u, v) , 它在图中指 向下。调用边 (call edge) 代表正常的过程调用, 其也指向下。链 u 派生链 v 与链 u 调用链 v 不同, 因为派生导出一条从 u 到链 的水平连接边, 意味着 可以和 v 同时执行, 然而一个函数调用 却没有这条边。当一个链 u 返回到它的调用函数时, 在函数调用过程中, 链 x 是紧随在 sync 之 后的链, 计算有向无环图中包含返回边(return edge) (u, x) , 它的方向是向上的。计算开始于初 始链 (initial strand) (图 27-2 中标为 P-FIB(4) 的黑色顶点), 且终止于结束链 (final strand) (图 27-2 中标为 P-FIB (4) 的白色顶点)。
性能度量
可以使用两种衡量标准来度量多线程算法的理论效率:工作量(work) 和持续时间(span) 。多线程计算的工作量是指在一个处理器上执行整个计算的总时间。换句话说,工作量就是每个链消耗时间的总和。如果计算有向无环图中的每个链耗费单位时间,那么工作量正是图中的顶点数。持续时间则是计算有向无环图中沿着任意一条路径链的最长执行时间。同样,如果计算有向无环图中每个链耗费单位时间,持续时间就是图中最长的路径或关键路径中的顶点数。
一个多线程计算的实际运行时间不仅取决千其工作址和持续时间,也取决于有多少可用的处理器数以及调度器如何对链进行处理器分配。为了表示多线程计算在P 个处理器上的运行时间,我们将用P 作下标。例如,我们用表示一个算法在P 个处理器上的运行时间。工作量是单个处理器上的运行时间,即 。如果每个链都拥有自己的处理器(换句话说,也就是有无限多的处理器),此时的运行时间就是持续时间。于是用 来表示持续时间。
调度
多线程调度器必须在事先不知道何时链被派生和结束的情况下来进行调度计算,它必须是在线(on-line) 的。此外,一个好的调度器能以分布式方式工作,其中实现调度器的线程协助计算中的负载平衡。
多线程算法的分析
如果两个子计算是串行连接的,它们的持续时间相加就形成了混合计算的持续时间;而如果它们是并行连接的,混合计算的持续时间就是这两个子计算中持续时间最大的。
井行循环
许多算法都包含循环,循环中的所有迭代能被并行执行。后面将会看到,使用spawn 和sync关键字并行化这些循环,可以很方便地直接标注使得这些循环并发执行的迭代。利用parellel 并发关键字,伪代码通过在for 循环语句的for 关键词前添加parallel 来实现这个功能。
竞争条件
一个多线程算法是确定的(deterministic) , 如果在同样的输入情况下总是做相同的事,且无论指令在多核计算机上如何被调度也是如此。一个多线程算法是非确定的(nondeterministic) , 如果每次执行它做的事情有所不同。一个多线程算法意图确定地做一些事情,但常常会失败,究其原因是算法中包含了“确定性竞争”。
当两个逻辑上并行指令访问存储器同一位置且至少有一个指令执行写操作的时候,便会发生确定性竞争(determinacy race) 。
虽然处理竞争有各种不同方法,包括使用互斥锁和其他的同步方法,但是对我们而言,简单的做法是确保并行运行的链是独立的:使它们之间不存在确定性竞争。因此,在一个parallel for结构中,所有迭代应该是独立的。在spawn 和对应的sync 之间,派生子过程的代码应该与父过程(包括其他派生过程和直接调用的程序)的代码相互独立。要注意的是传给派生子过程的参数应该在实际派生发生前在父过程中被计算出来,因而对于任何要访问那些派生子过程涉及的参数,都要在派生子过程执行完后再顺序地被访问。
27.2 多线程矩阵乘法
基千对4.2 节的SQUARE-MATRIXMULTIPLY过程中的循环进行并行化的
P-SQUARE-MATRIX-MULTIPLY(A, B)
n = A.rows
let C be a new nX n matrix
parallel for i = 1 ton
parallel for j = 1 ton
c(ij)=O
for k= 1 to n
c(ij) = c(ij) + a(ik) x b(kj)
return C
为了分析这个算法, 注意到其串行化版本就是 SQUARE-MATRIX-MULTIPLY, 因此容易 得到它的工作量 , 与 SQUARE-MATRIX-MULTIPLY 的运行时间相同。持续时间 是 $T_{\infty}(n)=\Theta(n) $, 因为它对应的是第 3 行开始的 paralle for 循环构成的递归树中的一条向下路 径, 然后第 4 行开始的 paralle for 循环构成的递归树向下, 再执行第 6 行开始的普通 for 循环的 所有 n 次迭代, 结果整个持续时间为 $\Theta(\lg n)+\Theta(\lg n)+\Theta(n)=\Theta(n) $ 。所以并行度为 $\Theta\left(n^{3}\right) / \Theta(n)= \Theta\left(n^{2}\right) $ 。
矩阵乘法的分治多线程算法
基于SQUARE-MATRIX-MULTIPLY-RECURSIVE 过程的多线程算法
P-MATRIX-MULTIPLY-RECURSIVE(C, A, B)
n = A.rows
if n == 1
c(11) = a(11)b(11))
else
let T be a new nXn matrix
partitionA, B, C, and Tinton/2Xn/2 submatrices
A11 , A12 , A21 , A22 ; ... ; C11 , C12 , C21 , C22;
and T11 , T12 , T21 , T22 ; respectively
spawn P-MATRIX -MULTIPLY-RECURSIVE(C11, A11, B11)
spawn P-MATRIX -MULTIPLY-RECURSIVE(C12, A11, B12)
spawn P-MATRIX-MULTIPLY-RECURSIVE(C21, A21, B11)
spawnP-MATRIX-MULTIPLY-RECURSIVE(C22, A21, B12)
spawn P-MATRIX-MULTIPLY-RECURSIVE(T11, A12, B21)
spawn P-MATRIX -MULTIPLY-RECURSIVE(T12, A12, B22)
spawn P-MATRIX -MULTIPLY-RECURSIVE(T21, A22, B21)
P-MATRIX-MULTIPLY-RECURSIVE(T22, A22, B22)
sync
parallel for i= 1 ton
parallel for j = 1 to n
c(ij) = c(ij) + t(ij)
P-MATRIX-MUL TIPL Y-RECUREIVE 并行度 ,这个值非常大。
多线程Strassen 算法
所以, 多线程 Strassen 方法的并行度是 , 这个数字仍然很 大, 但比 P-MATRIX-MULTIPLY-RECURSIVE 的并行度要小一些。
27.3 多线程归并排序
由于归并排序应用了分治模式,于是使用嵌套并行似乎不是多线程化的一个好的候选方法。可以简单地修改原伪代码,改第一个递归调用为派生的:
MERGE-SORT'CA, p, r)
if p < r
q = (p+r)/2
spawn MERGE-SORT'(A, p, q)
MERGE-SORT'(A, q+1, r)
sync
MERGE(A, p, q, r)
$\text { MERGE-SORT } \mathrm{T}^{\prime} $ 的并行度就是 $ M S_{1}^{\prime}(n) / M S_{\infty}^{\prime}(n)=\Theta(\lg n)$
基于 MERGE-SORT’ 的多线程归并排序 P-MERGE-SORT
BINARY-SEARCH(x, T, p, r)
low = p
high = max(p, r+1)
while low < high
mid = (low+high)/2
if x <= T[mid]
high = mid
else
low = mid+1
return high
P-MERGE(T, P1t ri, p,, r,, A, p,)
n1 = r1-p1 + 1
n2 = r2-p2 + 1
if n1 < n2 // ensure that n1 >= n2
exchange p, with P2
exchange r1 with r2
exchange n1 with n2
if n1 == 0 //both empty?
return
else q, =归+r1)/2」
q2 = BINARY-SEARCH(T[q1], T, p2, r2)
q3 = p3 + (q1 - p1) + (q2 - p2)
A[q3] = T[q1]
spawn P-MERGE(T, P1, q1-1, p2, q2-1, A, p2)
P-MERGE(T, q1 +1, r1, q2r2, A, q3+1)
sync
合并算法 P-MERGE 的并行度是 。
P-MERGE-SORTCA, p, r, B, s)
n = r-p+l
if n == 1
B[s] = A[p]
else
let T[1.. n] be a new array
q= (P+r)/2
q' = q-p+l
spawn P-MERGE-SORT(A, p, q, T, 1)
P-MERGE-SORTCA , q+1, r, T, q' + 1)
sync
P-MERGE(T , 1, q' , q' + 1, n, B, s)
算法 P-MERGE-SORT 的并行度是
第二十八章 矩阵运算
LUP 分解综述
LUP 分解背后的思想就是找出三个 $ n \times n $ 矩阵 $ L 、 U $ 和 $ P $, 满足 其中, $ L $ 是一个单位下三角矩阵, $ U $ 是一个上三角矩阵, $ P $ 是一个置换矩阵。我们称满足上式 的矩阵 $ L 、 U $ 和 $ P $ 为矩阵 $ A $ 的 LUP 分解。我们将说明每一个非奇异矩阵 $ A $ 都会有这样一 种分解。
已知P 、L 、U 和 b, 过程 LUP-SOLVE 通过结合使用正向替换与反向替换,求出x 的解。伪代码中假定维数n 出现在属性 L.rows 中,置换矩阵P 用数组 表示。
计算一个LU 分解
我们采用高斯消元法(Gaussian elimination) 来创建一个LU 分解。首先从其他方程中减去第一个方程的倍数,以把那些方程中的第一个变量消去。然后,从第三个及以后的方程中减去第二个方程的倍数,以把这些方程的第一个和第二个变量都消去。继续上述过程,直到系统变为一个上三角矩阵形式,实际上此矩阵就是U。矩阵L 是由消去变量所用的行的乘数组成。
我们对一个矩阵A 进行LU 分解的代码根据上述递归策略设计,只不过用一个迭代循环取代了递归过程。(这一转化是对“尾递归“过程(即最后的操作为自身递归调用的过程)进行标准的优化处理,参见思考题7-4 。)代码假定属性A. rows 表示A 的维度。我们初始化矩阵u, 使得对角线以下元素均为O; 以及矩阵L, 使得对角线元素都是1, 对角线以上元素都是0 。每次迭代都作用千一个子方阵,以其左上角元素为主元来计算v 和w 向量以及舒尔补,这样又生成一个子方阵,下次迭代将作用于这个子方阵。
我们对一个矩阵 $ A $ 进行 LU 分解的代码根据上述递归策略设计, 只不过用一个迭代循环取 代了递归过程。(这一转化是对“尾递归”过程 (即最后的操作为自身递归调用的过程)进行标准的 优化处理, 参见思考题 7-4。代码假定属性 $ A $. rows 表示 $ A $ 的维度。我们初始化矩阵 $ U $, 使得对 角线以下元素均为 0 ; 以及矩阵 $ L $, 使得对角线元素都是 1 , 对角线以上元素都是 0 。每次迭代都 作用于一个子方阵, 以其左上角元素为主元来计算 $ v $ 和 $ w $ 向量以及舒尔补, 这样又生成一个子 方阵, 下次迭代将作用于这个子方阵。
LU-DECOMPOSITION(A)
n = A.rows
let L and U be new nXn matrices
initialize U with 0s below the diagonal
initialize L with 1s on the diagonal and 0s above the diagonal
for k=1 to n
U(kk) = a(kk)
for i = k+1 to n
l(ik) = a(ik)/u(kk) // a(ik) holds u(kk)
u(ki) = a(ki) // u(ki) holds a(ki)
for i=k+1 to n
for j=k+1 to n
a(ij) = a(ij) - l(ik)u(kj)
return L and U
LU-DECOMPOSITION 运行时间为 $ \Theta\left(n^{3}\right)$ 。
28.2 矩阵求逆
通过LUP 分解计算逆矩阵
假设有一个矩阵 $ A $ 的 LUP 分解, 包括三个矩阵 $ L 、 U $ 和 $ P $, 满足 $ P A=L U $ 。运用 LUPSOLVE, 可以在 $ \Theta\left(n^{2}\right) $ 时间内求解一个具有 $ A x=b $ 形式的方程。因为 LUP 分解取决于 $ A $ 而 不是 $ b $, 我们能在第二个方程 $ A x=b^{\prime} $ 上运行 LUP-SOLVE, 额外时间复杂度为 $ \Theta\left(n^{2}\right) $ 。一般 而言, 一旦得到 $ A $ 的 LUP 分解, 就可以在 $ \Theta\left(k n^{2}\right) $ 时间内求解方程 $ A x=b, k $ 的值因 $ b $ 的不同 而改变。
我们可以考虑方程 $ A X=I_{n} $ 。它以一个含 $ n $ 个方程 (形式为 $ A x=b $ ) 的方程组方式定义了矩阵 $ X $, 即 $ A $ 的逆矩阵。更准确地说. 令 $ X_{i} $ 表示 $ X $ 的第 $ i $ 列, 回顾单位向量 $ e_{i} $ 是 $ I_{n} $ 的第 $ i $ 列。
于是可以利用 $ A $ 的 LUP 分解求解方程 (28.10) 中的 $ X A X_{i}=e_{i}$ 中的 $ X_{i} $ 。一旦得到 LUP 分解, 就可以在 $ \Theta\left(n^{2}\right) $ 时间内计算 $ n $ 个 $ X_{i} $ 列中的每一个, 因此可以在 $ \Theta\left(n^{3}\right) $ 时间内从 $ A $ 的 LUP 分解计算 $ X $ 。既然可以在 $ \Theta\left(n^{3}\right) $ 时间内确定 $ A $ 的 LUP 分解, 我们就可以在 时间内求出矩阵 A 的逆 。
第三十一章 数论算法
31.2 最大公约数
计算最大公约数的欧几里得算法基千如下定理。
GCD 递归定理 对任意非负整数 $ a $ 和任意正整数 $ b \operatorname{gcd}(a, b)=\operatorname{gcd}(b, a \bmod b)$
欧几里得算法
EUCLID(a,b)
if b == 0
return a
else
return EUCLID(b, a mod b)
欧几里得算法的扩展形式
推广该算法用千计算出满足下列条件的整系数x 和Y: 注意, x 与y 可能为0 或负数。我们将会发现这些系数对计算模乘法的逆是非常有用的。过程EXTENDED-.EUCLID 的输入为一对非负整数,其返回一个满足上式的三元组(d, x, y) 。
EXTENDED-EUCLID(a, b)
if b==O
return (a,1,0)
else
(d',x',y') = EXTENDED-EUCLID(b,a mod b)
(d,x,y)=(d',y',x' - a/b*y')
return (d,x,y)
31.4 求解模线性方程
现在来考虑求解下列方程的问题: 其中 $ a>0, n>0 $ 。这个问题有若干种应用。假设已知 $ a, b $ 和 $ n $, 希望找出所有满足上式 的对模 $ n $ 的 $ x $ 值。这个方程可能没有解, 也可能有一个或多个这样的解。
下列算法可输出该方程的所有解。输入a 和n 为任意正整数, b 为任意整数。
MODULAR-LINEAR-EQUATION-SOLVER(a,b,n)
(d,x',y') = EXTENDED-EUCLID(a, n)
if d|b
x(0) = x'(b/d) mod n
for i=0 to d-1
print(x(0) + i(n/d)) mod n
else
print "no solutions"
31.6 元素的幕
用反复平方法求数的幕
数论计算中经常出现一种运算,就是求一个数的幕对另一个数的模运算,也称为模取幕。更明确地说,希望有一种高效的方法来计算出 的值,其中a, b 为非负整数, n 是一个正整数。采用b 的二进制表示,反复平方法可以高效地解决这个问题。
设 $ \left\langle b_{k}, b_{k-1}, \cdots, b_{1}, b_{0}\right\rangle $ 是 $ b $ 的二进制表示 (即二进制表示有 $ k+1 $ 位长, $ b_{k} $ 为最高有效位, $ b_{0} $ 为最低有效位)。随着 $ c $ 的值从 0 到 $ b $ 成倍增长, 下列过程最终计算出 $ a^{c} \bmod n_{0} $ 。
MODULAR-EXPONENTIATION(a,b,n)
c = 0
d = 1
let [b(k),b(k-1),... ,b(0)] be the binary representation of b
for i = k downto 0
c = 2c
d = (d*d) mod n
if b(i) == 1
c = c+1
d = (d*a) mod n
return d
第三十二章 字符串匹配
32. 1 朴素字符串匹配算法
朴素字符串匹配算法是通过一个循环找到所有有效偏移,该循环对 n-m+1 个可能的s 值进行检测,看是否满足条件 。
NAIVE-STRING-MATCHER(T,P)
n = T.length
m = P.length
for s = 0 to n-m
if P[1.. m] == T[s+1.. s+m]
print "Pattern occurs with shift" s
在最坏情况下,朴素字符串匹配算法运行时间为
32.2 Rabin-Karp 算法
给定一个模式 $ P[1 . m] $, 假设 $ p $ 表示其相应的十进制值。类似地, 给定文本 $ T[1 . n] $, 假设 $ t_{s} $ 表示长度为 $ m $ 的子字符串 $ T[s+1 \ldots s+m] $ 所对应的十进制值, 其中 $ s=0,1, \cdots, n-m $ 。当 然, 只有在 $ T[s+1 \ldots s+m]=P[1 \ldots m] $ 时, $ t_{s}=p $ 。如果能在时间 $ \Theta(m) $ 内计算出 $ p $ 值, 并在总时 间 $ \Theta(n-m+1) $ 内计算出所有的 $ t_{s} $ 值 $ \Theta $, 那么通过比较 $ p $ 和每一个 $ t_{s} $ 值, 就能在 $ \Theta(m)+\Theta(n- $ $ m+1)=\Theta(n) $ 时间内找到所有的有效偏移 $ s_{0} $ (目前, 暂不考虑 $ p $ 和 $ t_{s} $ 值可能很大的问题。)
为了解决$ p $ 和 $ t_{s} $ 值可能很大的问题,我们选取一个合适的模 q 来计算p和4 的模。我们可以在$ \Theta(m) $ 的时间内计算出模q 的p 值,并且可以在$ \Theta(n-m+1) $ 时间内计算出模q的所有 $ t_s $ 值。如果选模q 为一个素数,使得 10q 恰好满足一个计算机字长,那么可以用单精度算术运算执行所有必需的运算。在一般情况下,采用d 进制的字母表{ o, 1, …, d-1} 时,选取一个q 值,使得dq 在一个计算机字长内,然后调整递归式(32.1), 使其能够对模q 有效,式子变为:
但是基于模 q 得到的结果并不完美: 并不能说明 。但是另一方面, 如果 , 那么可以断定 , 从而确定偏移 s 是无效的。因此可以把测试 $ t_{s} \equiv p(\bmod q)$ 是立作为一种快速的启发式测试方法用于检测无效偏移s。。任何满足 的偏移s 都需要被进一步检测,看s 是真的有效还是仅仅是一个伪命中点。这项额外的测试可以通过检测条件 来完成,如果q 足够大,那么这个伪命中点可以尽量少出现,从而使额外测试的代价降低。
RABIN-KARP-MATCHER(T,P,d,q)
n = T.length
m = P.length
h = d^(m-1) mod q
p = 0
t(0) = 0
for i=1 to m // preprocessmg
p = (d x p + P[i]) mod q
t(0) = (d x t(0) + T[i]) mod q
for s = 0 to n-m // matching
if p == t(s)
if P[1.. m] = = T[s+ 1.. s+m]
print "Pattern occurs with shift" s
if s < n-m
t(s+1)=(d x (t(t(s)-T[s+1]h)+T[s+m+1]) mod q
Java实现
public class StringMatch {
public void rabinKarpMatcher(String t, String p, int d, int q) {
int n = t.length();
int m = p.length();
int h = (int) Math.pow(d, m - 1) % q;
int pHash = 0;
int tHash = 0;
// 计算第一个Hash值
for (int i = 0; i < m; i++) {
pHash = (d * pHash + p.charAt(i)) % q;
tHash = (d * tHash + t.charAt(i)) % q;
}
for (int s = 0; s < n - m + 1; s++) {
if (pHash == tHash) {
// 相等判断字符串是否相等
if (p.equals(t.substring(s, s + m))) {
System.out.println("Pattern occurs with shift " + s);
}
}
if (s < n - m) {
// 不相等计算下一个Hash值
tHash = (d * (tHash - t.charAt(s) * h) + t.charAt(s + m)) % q;
if (tHash < 0) {
// 保证余数不为复负数
tHash = tHash + q;
}
}
}
}
public static void main(String[] args) {
StringMatch match = new StringMatch();
match.rabinKarpMatcher("2359023141526739921", "31415", 10, 13);
}
}
分析
RABIN-KARP-MATCHER 的预处理时间为 $\Theta(m) $, 在最坏情况下, 它的匹配时间是 , 因为 Rabin-Karp 算法和朴素字符串匹配算法一样, 对每个有效偏移进行显式 验证。如果 并且 , 由于在 n-m+1 个可能的偏移中每一个都是有效的, 则验证所 需的时间为 $ \Theta((n-m+1) m)$ 。
32.3 利用有限自动机进行字符串匹配
很多字符串匹配算法都要建立一个有限自动机,它是一个处理信息的简单机器,通过对文本字符串T 进行扫描,找出模式P 的所有出现位置。本节将介绍一种建立这样自动机的方法。这些字符串匹配的自动机都非常有效:它们只对每个文本字符检查一次,并且检查每个文本字符时所用的时间为常数。因此,在模式预处理完成并建立好自动机后进行匹配所需要的时间为
字符串匹配自动机
定义一个辅助函数 $ \sigma $, 称 为对应 $ P $ 的后缀函数。函数 $ \sigma $ 是一个从 $ \Sigma^{*} $ 到 $ {0,1, \cdots, m} $ 上的映射, 满足 $ \sigma(x) $ 是 $ x $ 的后缀 $ P $ 的最长前缀的长度:
因为空字符串 $P_0=\varepsilon $ 是每一个字符串的后缀,所以后缀函数6 是良定义的。
给定模式 , 其相应的字符串匹配自动机定义如下:
- 状态集合Q 为 。开始状态 是0 状态,并且只有状态m 是唯一被接受的状态。
- 对任意的状态q 和字符a, 转移函数 定义如下:
我们定义 $ \delta(q, a)=\sigma\left(P_{q} a\right) $, 目的是记录已得到的与模式 $ P $ 匹配的文本字符串 $ T $ 的最长前 缀。考虑最近一次扫描 $ T $ 的字符。为了使 $ T $ 的一个子串 (以 $ T[i] $ 结尾的子串)能够和 $ P $ 的某些前 缀 $ P_{j} $ 匹配, 前缀 $ P_{j} $ 必须是 $ T_{i} $ 的一个后缀。假设 $ q=\phi\left(T_{i}\right) $, 那么在读完 $ T_{i} $ 之后, 自动机处在状 态 $ q $ 。设计转移函数 $ \delta $, 使用状态数 $ q $ 表示 $ P $ 的前缀和 $ T_{i} $ 后缀的最长匹配长度。也就是说, 在处 于状态 $ q $ 时, $ P_{q} $ コ $ T_{i} $ 并且 $ q=\sigma\left(T_{i}\right) $ 。(每当 $ q=m $ 时, 所有 $ P $ 的 $ m $ 个字符都和 $ T_{i} $ 的一个后缀匹 配, 从而得到一个匹配。)
32.4 Knuth-Morris-Pratt 算法
现在来介绍一种由 Knuth、Morris 和 Pratt 三人设计的线性时间字符串匹配算法。这个算法 无需计算转移函数 $ \delta $, 匹配时间为 $ \Theta(n) $, 只用到辅助函数 $ \pi $, 它在 $ \Theta(m) $ 时间内根据模式预先计 算出来, 并且存储在数组 $ \pi[1 . m] $ 中。数组 $ \pi $ 使得我们可以按需要“即时”有效地计算(在摊还 意义上来说)转移函数 $ \delta $ 。粗略地说, 对任意状态 $ q=0,1, \cdots, m $ 和任意字符 $ a \in \sum, \pi[q] $ 的 值包含了与 $ a $ 无关但在计算 $ \delta(q, a) $ 时需要的信息。由于数组 $ \pi $ 只有 $ m $ 个元素, 而 $ \delta $ 有 $ \Theta\left(m\left|\sum\right|\right) $ 个值, 所以通过预先计算 $ \pi $ 而不是 $ \delta $, 可以使计算时间减少一个 $ \sum $ 因子。
关千模式的前缀函数
模式的前缀函数 包含模式与其自身的偏移进行匹配的信息。这些信息可用于在朴素的字符串匹配算法中避免对无用偏移进行检测,也可以避免在字符串匹配自动机中,对整个转移函数 的预先计算。
下面是预计算过程的形式化说明。已知一个模式 $ P[1 \ldots m] $, 模式 $ P $ 的前缀函数是函数 $ \pi:{1,2, \cdots, m} \rightarrow{0,1, \cdots, m-1} $, 满足
即 $ \pi[q] $ 是 $ P_{q} $ 的真后缀 $ P $ 的最长前缀长度。
下面给出的Knuth-Morris-Pratt 匹配算法的伪代码就是KMP-MATCHER 过程。我们将看到,其大部分都是在模仿FINITE-AUTOMATON-MATCHER 。KMP-MATCHER 调用了一个辅助程序COMPUTE-PREFIX-FUNCTION 来计算 。
KMP-MATCHERCT,P)
n = T.length
m = P.lenfth
pi = COMPUTE-PREFIX-FUNCTION(P)
q = 0 // number of characters matched
for i = 1 to n // scan the text from left to right
while q > 0 and P[q + 1] != T[i]
q = pi[q] // next character does not match
if P[q + 1] == T[i]
q = q + 1 // next character matches
if q == m // is all of P matched?
print "Pattern occurs with shift" i-m
q = pi[q] // look for the next match
COMPUTE-PREFIX-FUNCTION(P)
m = P.length
let pi[1.. m] be a new array
pi[1] = 0
k = 0
for q = 2 to m
while k > 0 and P[k + 1] != P[q]
k = pi[k]
if P[k + 1] == P[q]
k = k + 1
pi[q] = k
return pi
Java实现
public class StringMatch {
private int[] computePrefix(String p) {
int[] prefix = new int[p.length()];
int k = 0;
for (int i = 1; i < prefix.length; i++) {
// 不相等时,判断是否存在子对称
while (k > 0 && p.charAt(k) != p.charAt(i)) {
// 找前一个前缀字符的下一个
k = prefix[k - 1];
}
// 子对称,或者继承对称
if (p.charAt(k) == p.charAt(i)) {
k++;
}
prefix[i] = k;
}
return prefix;
}
public void kmpMatcher(String t, String p) {
int[] prefix = computePrefix(p);
System.out.println(Arrays.toString(prefix));
int k = 0;
for (int i = 0; i < t.length(); i++) {
while (k > 0 && p.charAt(k) != t.charAt(i)) {
k = prefix[k - 1];
}
if (p.charAt(k) == t.charAt(i)) k++;
if (k == p.length()) {
System.out.println("KMP pattern occurs with shift " + (i - k));
k = prefix[k - 1];
}
}
}
public static void main(String[] args) {
StringMatch match = new StringMatch();
match.kmpMatcher("agcgsgcgagctagacagctagctgggscgcsgcagctagcagctagctggsceeg", "agctagcagctagctg");
}
}
分析
算法复杂度为$ \Theta(n) $
评论区