别再死磕公式了!用Python+NumPy手把手模拟MCMC采样(附完整代码)
用Python实战MCMC采样:从零构建Metropolis-Hastings算法
当我在第一次接触贝叶斯统计时,被那些复杂的数学公式和理论推导弄得晕头转向。直到有一天,我决定抛开公式,直接用代码实现一个最简单的MCMC采样器,一切才变得清晰起来。这就是本文的由来——我们将用NumPy从零开始构建一个Metropolis-Hastings采样器,通过代码而非公式来理解MCMC的核心思想。
1. 环境准备与问题设定
在开始编写采样器之前,我们需要明确几个关键点:
- 目标:从一个已知的概率分布中抽取样本,但我们无法直接从这个分布中采样
- 工具:Python 3.x + NumPy + Matplotlib(用于可视化)
- 示例分布:假设我们要采样的目标分布是p(x) = 0.3N(-3,1) + 0.7N(3,1),即两个正态分布的混合
首先设置环境:
import numpy as np import matplotlib.pyplot as plt from scipy.stats import norm # 设置随机种子保证结果可复现 np.random.seed(42) # 定义目标分布 def target_distribution(x): return 0.3 * norm.pdf(x, loc=-3, scale=1) + 0.7 * norm.pdf(x, loc=3, scale=1)提示:在实际应用中,目标分布可能复杂得多,甚至没有解析表达式。这里选择混合正态分布是因为它足够简单,同时又具有多峰特性,适合演示MCMC的能力。
2. Metropolis-Hastings算法核心实现
Metropolis-Hastings算法的核心在于通过一个提议分布(proposal distribution)来生成新的候选样本,然后根据一定的概率决定是否接受这个新样本。以下是关键步骤:
- 初始化:选择一个起始点x0
- 迭代:对于每次迭代t: a. 从提议分布q(x'|x_t)生成候选x' b. 计算接受概率α = min(1, p(x')q(x_t|x') / p(x_t)q(x'|x_t)) c. 以概率α接受x'作为x_{t+1},否则保留x_t
实现代码如下:
def metropolis_hastings(target, n_samples, initial_value, proposal_width=1): samples = [initial_value] current_value = initial_value for _ in range(n_samples): # 从正态分布生成候选样本 proposal = np.random.normal(current_value, proposal_width) # 计算接受概率 prob_accept = min(1, target(proposal) / target(current_value)) # 决定是否接受 if np.random.rand() < prob_accept: current_value = proposal samples.append(current_value) return np.array(samples)注意:这里我们使用了对称的提议分布(正态分布),因此q(x'|x)=q(x|x'),接受概率简化为p(x')/p(x_t)。这是Metropolis算法,是Metropolis-Hastings的特例。
3. 运行采样器与结果分析
现在让我们运行这个采样器并分析结果:
# 运行采样器 samples = metropolis_hastings(target_distribution, n_samples=10000, initial_value=0) # 绘制采样结果 plt.figure(figsize=(12, 6)) x = np.linspace(-10, 10, 1000) plt.plot(x, target_distribution(x), 'r-', label='Target Distribution') plt.hist(samples[500:], bins=50, density=True, alpha=0.6, label='Samples') plt.legend() plt.show()几个关键观察点:
- Burn-in期:前500个样本被丢弃,这是为了让链达到稳定状态
- 采样质量:直方图与目标分布曲线匹配良好,说明采样有效
- 混合程度:查看时间序列图可以评估链在不同模式间的跳跃情况
# 绘制采样序列 plt.figure(figsize=(12, 4)) plt.plot(samples) plt.xlabel('Iteration') plt.ylabel('Sample Value') plt.title('MCMC Sample Trace') plt.show()4. 调优与常见问题解决
在实际应用中,你可能会遇到以下问题及解决方案:
4.1 提议分布宽度选择
提议分布的宽度(proposal_width)对采样效率至关重要:
- 过小:接受率高但探索能力差,导致自相关高
- 过大:拒绝率高,采样效率低下
经验法则:调整宽度使接受率在20-50%之间
def calculate_acceptance_rate(samples): return np.mean(np.diff(samples) != 0) print(f"Acceptance rate: {calculate_acceptance_rate(samples):.2%}")4.2 处理多峰分布
对于多峰分布(如我们的例子),链可能会被困在一个模式中。解决方法包括:
- 使用更大的提议分布宽度
- 尝试不同的初始值
- 使用并行链并检查收敛性
4.3 收敛诊断
常用收敛诊断方法:
- 目视检查:观察时间序列图是否稳定
- Gelman-Rubin统计量:比较多条链的方差
- 自相关图:检查样本自相关性衰减速度
from statsmodels.graphics.tsaplots import plot_acf plot_acf(samples[500:], lags=50) plt.show()5. 进阶技巧与性能优化
当你的MCMC实现基本工作后,可以考虑以下优化:
5.1 自适应MCMC
在运行过程中动态调整提议分布:
def adaptive_mh(target, n_samples, initial_value, initial_width=1): samples = [initial_value] current_value = initial_value current_width = initial_width for i in range(n_samples): proposal = np.random.normal(current_value, current_width) prob_accept = min(1, target(proposal) / target(current_value)) if np.random.rand() < prob_accept: current_value = proposal samples.append(current_value) # 每100次迭代调整宽度 if i % 100 == 0 and i > 0: current_width = np.std(samples[-100:]) * 2.4 # 2.4是经验值 return np.array(samples)5.2 并行运行多条链
利用多核CPU并行运行多条链,然后合并结果:
from multiprocessing import Pool def run_chain(args): target, n_samples, initial_value = args return metropolis_hastings(target, n_samples, initial_value) with Pool() as p: chains = p.map(run_chain, [(target_distribution, 2500, i) for i in [-5, 0, 5]]) all_samples = np.concatenate(chains)5.3 使用Numba加速
对于复杂的目标分布,可以使用Numba进行JIT编译加速:
from numba import njit @njit def target_distribution_numba(x): return 0.3 * np.exp(-0.5*(x+3)**2)/np.sqrt(2*np.pi) + \ 0.7 * np.exp(-0.5*(x-3)**2)/np.sqrt(2*np.pi) @njit def metropolis_hastings_numba(target, n_samples, initial_value, proposal_width=1): samples = np.zeros(n_samples + 1) samples[0] = initial_value current_value = initial_value for i in range(1, n_samples + 1): proposal = np.random.normal(current_value, proposal_width) prob_accept = min(1, target(proposal) / target(current_value)) if np.random.rand() < prob_accept: current_value = proposal samples[i] = current_value return samples6. 实际应用案例:贝叶斯线性回归
让我们看一个更实际的例子——用MCMC进行贝叶斯线性回归参数估计。假设我们有模型y = ax + b + ε,ε~N(0,σ²),我们要估计a、b和σ。
# 生成模拟数据 np.random.seed(42) x = np.linspace(0, 10, 50) true_a, true_b = 1.5, -2 y = true_a * x + true_b + np.random.normal(0, 1, len(x)) # 定义似然和先验 def log_prior(params): a, b, sigma = params if sigma <= 0: return -np.inf return norm.logpdf(a, 0, 10) + norm.logpdf(b, 0, 10) + norm.logpdf(sigma, 0, 10) def log_likelihood(params, x, y): a, b, sigma = params y_pred = a * x + b return np.sum(norm.logpdf(y, y_pred, sigma)) def log_posterior(params, x, y): lp = log_prior(params) if not np.isfinite(lp): return -np.inf return lp + log_likelihood(params, x, y) # 运行MCMC def mh_regression(x, y, n_samples=10000): params = np.array([0.0, 0.0, 1.0]) # 初始值 samples = np.zeros((n_samples + 1, 3)) samples[0] = params for i in range(1, n_samples + 1): proposal = params + np.random.normal(0, 0.1, 3) prob_accept = min(1, np.exp(log_posterior(proposal, x, y) - log_posterior(params, x, y))) if np.random.rand() < prob_accept: params = proposal samples[i] = params return samples samples = mh_regression(x, y)这个例子展示了如何将MCMC应用于实际的统计建模问题。通过检查采样结果,我们可以得到参数的后验分布,进而进行统计推断。
