"""
MCMC over a registered forward model.
"""
from __future__ import annotations
import numpy as np
from .config import Config
from .fitting import log_posterior
from .models import get_model
__all__ = ["run_mcmc", "summarise", "summarise_samples", "gelman_rubin"]
# diagnostics
[docs]
def gelman_rubin(chains: list[np.ndarray]) -> float:
"""
Gelman-Rubin :math:'\\hat R' for a list of 1-D chains.
''< 1.01'' is well-converged; ''> 1.1'' indicates problems.
"""
m = len(chains)
n = min(len(c) for c in chains)
chains = [np.asarray(c[:n]) for c in chains]
w = np.mean([c.var(ddof=1) for c in chains])
if w == 0.0:
return float("nan")
chain_means = np.array([c.mean() for c in chains])
b = n * chain_means.var(ddof=1)
var_hat = (1 - 1 / n) * w + b / n
return float(np.sqrt(var_hat / w))
# emcee path
def _initial_ball(p_map: np.ndarray, n_walkers: int, prior_lo, prior_hi, rng,
fractional_jitter: float = 0.03,
) -> np.ndarray:
"""
Initial Gaussian ball around MAP, clipped into the prior box."""
p_map = np.asarray(p_map, dtype=float)
lo = np.asarray(prior_lo, dtype=float)
hi = np.asarray(prior_hi, dtype=float)
pad = 1e-6 * (hi - lo)
def one():
p = p_map * (1 + rng.normal(0, fractional_jitter, p_map.size))
# nudge anything that landed at/below 0 up to a small positive value
p = np.where(p <= 0, np.maximum(p_map * 0.1, lo + pad), p)
return np.clip(p, lo + pad, hi - pad)
return np.array([one() for _ in range(n_walkers)])
def _run_emcee(data_bins, sigma_res, p_map, model, cfg, extras, verbose,
):
import emcee
fm = get_model(model)
rng = np.random.default_rng(cfg.random_seed)
p0 = _initial_ball(p_map, cfg.n_walkers, fm.prior_lo, fm.prior_hi, rng)
def log_prob(p):
return log_posterior(p, data_bins, sigma_res, model=model, **extras)
sampler = emcee.EnsembleSampler(cfg.n_walkers, fm.n_params, log_prob)
total = cfg.n_warmup + cfg.n_keep
if verbose:
print(f" emcee: {cfg.n_walkers} walkers × {total} steps "
f"(thin={cfg.thin}, n_dim={fm.n_params}, model={model!r})")
sampler.run_mcmc(p0, total, progress=False)
samples = sampler.get_chain(discard=cfg.n_warmup, thin=cfg.thin, flat=True)
if verbose:
print(f" acceptance fraction: "
f"{np.mean(sampler.acceptance_fraction):.3f}")
try:
tau = sampler.get_autocorr_time(quiet=True)
print(f" autocorrelation time: " +
" ".join(f"{t:.1f}" for t in tau))
except Exception:
pass
print(f" total samples kept: {len(samples)}")
return samples
# MH fallback
def _run_mh(data_bins, sigma_res, p_map, model, cfg, extras, verbose):
fm = get_model(model)
rng_global = np.random.default_rng(cfg.random_seed)
n_dim = fm.n_params
p_map = np.asarray(p_map, dtype=float)
step = np.maximum(np.abs(p_map) * 0.05, 1e-6)
n_total = cfg.n_warmup + cfg.n_keep
def chain(start, seed):
rng = np.random.default_rng(seed)
cur = start.copy()
cur_lp = log_posterior(cur, data_bins, sigma_res,
model=model, **extras)
out = [cur.copy()]
n_acc = 0
for _ in range(n_total):
new = cur + rng.normal(0, step, n_dim)
new_lp = log_posterior(new, data_bins, sigma_res,
model=model, **extras)
if np.log(rng.random() + 1e-300) < new_lp - cur_lp:
cur, cur_lp = new, new_lp
n_acc += 1
out.append(cur.copy())
return np.array(out), n_acc / n_total
n_chains = 4
chains = []
if verbose:
print(f" MH fallback: {n_chains} chains × {n_total} steps "
f"(thin={cfg.thin}, n_dim={n_dim})")
for cid in range(n_chains):
start = p_map * (1 + rng_global.normal(0, 0.03, n_dim))
ch, acc = chain(start, cfg.random_seed + cid)
chains.append(ch[cfg.n_warmup::cfg.thin])
if verbose:
print(f" chain {cid+1}: acceptance={acc:.3f} "
f"kept={len(chains[-1])}")
rhats = [gelman_rubin([c[:, i] for c in chains]) for i in range(n_dim)]
if verbose:
print(f" Gelman-Rubin R̂: " + " ".join(f"{r:.4f}" for r in rhats))
return np.vstack(chains)
# top-level entry point
[docs]
def run_mcmc(data_bins,
sigma_res,
p_map,
model: str = "anisotropic_rotor",
cfg: Config | None = None,
verbose: bool = True,
**extras,
) -> np.ndarray:
"""Run MCMC over a registered forward model.
Parameters
----------
data_bins : list
From :func:'qens.fitting.build_data_bins'.
sigma_res : float | array | list[array]
Resolution: scalar Gaussian σ in meV, single measured kernel, or
one kernel per Q-bin.
p_map : array
MAP starting point (from :func:'qens.fitting.find_map').
model : str
Registered forward-model name.
cfg : Config
verbose : bool
extras :
Forwarded to the model's ''predict'' callable.
Returns
-------
samples : ndarray of shape ''(n_kept, n_dim)''
"""
if cfg is None:
cfg = Config()
try:
import emcee # noqa: F401
return _run_emcee(data_bins, sigma_res, p_map, model, cfg,
extras, verbose)
except ImportError:
if verbose:
print(" emcee not found — using Metropolis-Hastings fallback "
"(consider: pip install emcee)")
return _run_mh(data_bins, sigma_res, p_map, model, cfg,
extras, verbose)
# summarisation
[docs]
def summarise(arr: np.ndarray, label: str = "", verbose: bool = True
) -> tuple[float, float, float]:
"""
Median and 95% credible interval for a single parameter chain.
"""
arr = np.asarray(arr)
lo, hi = np.percentile(arr, [2.5, 97.5])
med = float(np.median(arr))
if verbose and label:
print(f" {label:<24} median={med:.5f} "
f"95% CI=[{lo:.5f}, {hi:.5f}]")
return med, float(lo), float(hi)
[docs]
def summarise_samples(samples: np.ndarray,
model: str = "anisotropic_rotor",
derived: dict | None = None,
verbose: bool = True,
) -> dict[str, tuple[float, float, float]]:
"""
Per-parameter median + 95% CI for a registered model.
Parameters
----------
samples : ndarray, shape (n, n_dim)
model : str
Registered model name (used to look up parameter names).
derived : dict, optional
''{label: callable(samples) -> 1-D array}'' — extra derived
quantities to summarise (e.g. ''D_s / D_t'' for anisotropic).
verbose : bool
Returns
-------
dict[label, (median, lo95, hi95)]
"""
fm = get_model(model)
out: dict[str, tuple[float, float, float]] = {}
for i, name in enumerate(fm.param_names):
out[name] = summarise(samples[:, i], name, verbose=verbose)
if derived:
for label, fn in derived.items():
out[label] = summarise(fn(samples), label, verbose=verbose)
return out