In [1]:
# 第0步:安装依赖(JupyterLite 需要先安装 openpyxl)import micropipawait micropip.install("openpyxl")print("openpyxl 安装完成")# 第1步:导入库import warningsimport datetimeimport timefrom pathlib import Pathimport reimport numpy as npimport pandas as pdimport matplotlib.pyplot as pltwarnings.filterwarnings("ignore")plt.rcParams["axes.unicode_minus"] = Falsefor font in ["Noto Sans CJK SC", "WenQuanYi Micro Hei", "SimHei", "Microsoft YaHei"]:    try:        plt.rcParams["font.sans-serif"] = [font]        break    except Exception:        continueprint("库导入完成")# 第2步:时间戳与日志工具def get_timestamp():    return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")def get_readable_time():    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")class RunLogger:    def __init__(self, prefix="run"):        self.timestamp = get_timestamp()        self.readable_time = get_readable_time()        self.log_file = f"{prefix}_log_{self.timestamp}.txt"        self.lines = []        self.log("=" * 60)        self.log("运行日志")        self.log(f"开始时间: {self.readable_time}")        self.log(f"时间戳: {self.timestamp}")        self.log("=" * 60)        def log(self, message):        self.lines.append(message)        print(message)        def log_section(self, title):        self.log("")        self.log(f"【{title}】")        self.log("-" * 40)        def log_param(self, name, value):        self.log(f"  {name}: {value}")        def log_result(self, name, value):        self.log(f"  {name}: {value}")        def finish(self):        self.log("")        self.log("=" * 60)        self.log("运行结束")        self.log(f"总耗时: {elapsed:.2f} 秒")        self.log("=" * 60)        with open(self.log_file, "w", encoding="utf-8") as f:            f.write("\n".join(self.lines))        print(f"\n日志已保存: {self.log_file}")        return self.log_filedef save_with_timestamp(df, prefix, timestamp, suffix="csv"):    filename = f"{prefix}_{timestamp}.{suffix}"    df.to_csv(filename, index=False, encoding="utf-8-sig")    return filename# 初始化日志ts = get_timestamp()logger = RunLogger(prefix="yield_curve")logger.log_section("初始化")logger.log_param("时间戳", ts)logger.log_param("环境", "JupyterLite / Pyodide")# 第3步:全局配置KEY_TENORS = np.array([    0.02, 0.04, 0.08, 0.17, 0.25, 0.50, 0.75,    1.0, 2.0, 3.0, 5.0, 7.0, 10.0, 15.0, 20.0, 30.0, 50.0])def classify_curve_type(file_name):    name = str(file_name)    if "AAA" in name.upper():        return "AAA"    if "中债曲线" in name:        return "中债曲线"    return Nonedef tenor_label(x):    x = float(x)    if abs(x - 0.02) < 0.006:        return "7D"    if abs(x - 0.04) < 0.006:        return "14D"    if x < 1:        return f"{round(x * 12)}M"    if abs(x - round(x)) < 1e-8:        return f"{int(round(x))}Y"    return f"{x:.2f}Y"def extract_file_year(file_name):    match = re.search(r"(20\d{2})", str(file_name))    return int(match.group(1)) if match else Nonelogger.log_section("配置参数")logger.log_param("关键期限", [tenor_label(t) for t in KEY_TENORS])# 第4步:读取数据def read_yield_excel(path):    curve_type = classify_curve_type(path.name)    if curve_type is None:        return pd.DataFrame()    xls = pd.ExcelFile(path)    sheet = xls.sheet_names[0]    df = pd.read_excel(path, sheet_name=sheet)    df.columns = [str(c).strip() for c in df.columns]    rename = {        "日期": "date",        "标准期限说明": "tenor_text",        "标准期限(年)": "maturity_years",        "收益率(%)": "yield_percent",        "收益率": "yield_percent",    }    df = df.rename(columns={c: rename.get(c, c) for c in df.columns})    required = ["date", "maturity_years", "yield_percent"]    for col in required:        if col not in df.columns:            return pd.DataFrame()    df = df[required].copy()    df["date"] = pd.to_datetime(df["date"], errors="coerce")    df["maturity_years"] = pd.to_numeric(df["maturity_years"], errors="coerce")    df["yield_percent"] = pd.to_numeric(df["yield_percent"], errors="coerce")    df = df.dropna(subset=required)    df["curve_type"] = curve_type    df["year"] = df["date"].dt.year    return df# 读取所有 Excel 文件files = sorted(Path("/kaggle/input/datasets/songsammy/yangrui-bond-curves").glob("*.xlsx"))all_data = []for f in files:    df = read_yield_excel(f)    if not df.empty:        all_data.append(df)        logger.log(f"读取: {f.name}")data = pd.concat(all_data, ignore_index=True)data = data[(data["maturity_years"] > 0) & (data["yield_percent"] > 0)]data = data.sort_values(["curve_type", "year", "date", "maturity_years"]).reset_index(drop=True)logger.log_section("数据加载完成")logger.log_param("总行数", len(data))logger.log_param("曲线类型", data["curve_type"].unique().tolist())logger.log_param("年份", sorted(data["year"].unique().tolist()))print(data.groupby(["curve_type", "year"]).size())# 第5步:KRR(核岭回归)模型def rbf_kernel(X1, X2, gamma):    X1 = np.asarray(X1, dtype=float)    X2 = np.asarray(X2, dtype=float)    if X1.ndim == 1:        X1 = X1.reshape(-1, 1)    if X2.ndim == 1:        X2 = X2.reshape(-1, 1)    sq_dists = (np.sum(X1**2, axis=1).reshape(-1, 1) + np.sum(X2**2, axis=1) - 2 * np.dot(X1, X2.T))    return np.exp(-gamma * sq_dists)def krr_fit(X_train, y_train, gamma, alpha):    K = rbf_kernel(X_train, X_train, gamma)    n = K.shape[0]    A = K + alpha * np.eye(n)    return np.linalg.solve(A, y_train)def krr_predict(X_train, alpha_coef, X_test, gamma):    K_s = rbf_kernel(X_test, X_train, gamma)    return K_s.dot(alpha_coef)def cv_rmse_krr(X, y, gamma, alpha, n_splits=5):    n = len(y)    if n < n_splits:        return np.nan    fold_size = n // n_splits    errs = []    for i in range(n_splits):        start = i * fold_size        end = (i + 1) * fold_size if i < n_splits - 1 else n        test_idx = list(range(start, end))        train_idx = [j for j in range(n) if j not in test_idx]        if len(train_idx) < 2:            continue        alpha_coef = krr_fit(X[train_idx], y[train_idx], gamma, alpha)        y_pred = krr_predict(X[train_idx], alpha_coef, X[test_idx], gamma)        errs.append(np.mean((y[test_idx] - y_pred) ** 2))    return np.sqrt(np.mean(errs)) if errs else np.nandef run_krr_pipeline(data, sample_days_per_group=None):    cleaned_records = []    param_records = []    fitted_records = []    key_records = []    metric_records = []    summary_records = []    groups = data.groupby(["curve_type", "year"])    for (ctype, year), g in groups:        g = g.sort_values("date").reset_index(drop=True)        if sample_days_per_group is not None:            unique_days = g["date"].unique()            if len(unique_days) > sample_days_per_group:                selected = np.random.choice(unique_days, sample_days_per_group, replace=False)                g = g[g["date"].isin(selected)].reset_index(drop=True)        cleaned_records.append(g.copy())        X_all = g["maturity_years"].values.reshape(-1, 1)        y_all = g["yield_percent"].values        # 超参数搜索        alpha_grid = [1e-5, 3e-5, 1e-4, 3e-4, 1e-3, 3e-3, 1e-2]        gamma_grid = [0.01, 0.03, 0.05, 0.1, 0.2, 0.5, 1.0]        best_cv = np.inf        best_alpha, best_gamma = 1e-3, 0.1        for alpha in alpha_grid:            for gamma in gamma_grid:                cv = cv_rmse_krr(X_all, y_all, gamma, alpha, n_splits=5)                if not np.isnan(cv) and cv < best_cv:                    best_cv = cv                    best_alpha, best_gamma = alpha, gamma        logger.log(f"KRR 参数选择: {ctype} {year} alpha={best_alpha}, gamma={best_gamma}, CV_RMSE={best_cv:.2f}bp")        param_records.append({            "curve_type": ctype, "year": year,            "alpha": best_alpha, "gamma": best_gamma, "cv_rmse_bp": best_cv        })        # 拟合整条曲线        alpha_coef = krr_fit(X_all, y_all, best_gamma, best_alpha)        x_fine = np.linspace(X_all.min(), X_all.max(), 200)        y_fine = krr_predict(X_all, alpha_coef, x_fine.reshape(-1, 1), best_gamma)        fitted_records.append(pd.DataFrame({            "curve_type": ctype, "year": year,            "maturity_years": x_fine, "fitted_yield": y_fine        }))        # 关键期限收益率        key_y = krr_predict(X_all, alpha_coef, KEY_TENORS.reshape(-1, 1), best_gamma)        key_records.append(pd.DataFrame({            "curve_type": ctype, "year": year,            "maturity_years": KEY_TENORS, "key_yield": key_y,            "tenor_label": [tenor_label(t) for t in KEY_TENORS]        }))        # 误差指标        y_pred = krr_predict(X_all, alpha_coef, X_all, best_gamma)        resid = y_all - y_pred        metric_records.append({            "curve_type": ctype, "year": year,            "rmse_bp": np.sqrt(np.mean(resid**2)) * 100,            "mae_bp": np.mean(np.abs(resid)) * 100,            "max_err_bp": np.max(np.abs(resid)) * 100,            "r2": 1 - np.sum(resid**2) / np.sum((y_all - np.mean(y_all))**2)        })        # 年度统计        summary_records.append({            "curve_type": ctype, "year": year,            "n_obs": len(g),            "mean_yield": np.mean(y_all),            "std_yield": np.std(y_all),            "min_yield": np.min(y_all),            "max_yield": np.max(y_all)        })    cleaned_df = pd.concat(cleaned_records, ignore_index=True)    param_df = pd.DataFrame(param_records)    fitted_df = pd.concat(fitted_records, ignore_index=True)    key_df = pd.concat(key_records, ignore_index=True)    metric_df = pd.DataFrame(metric_records)    summary_df = pd.DataFrame(summary_records)    return cleaned_df, param_df, fitted_df, key_df, metric_df, summary_df# 运行 KRR —— 使用 sample_days_per_group 控制每组采样天数# 设为 80 天与第一次代码效果类似,设为 None 则使用全部数据logger.log_section("开始 KRR 模型")logger.log("提示: 如运行过慢,可调整 sample_days_per_group 参数(如 80)减少数据量")krr_cleaned, krr_params, krr_fitted, krr_key, krr_metrics, krr_summary = run_krr_pipeline(    data, sample_days_per_group=80  # <-- 改为 None 使用全部数据,或调整天数)# 保存 KRR 结果save_with_timestamp(krr_cleaned, "krr_cleaned_data", ts)save_with_timestamp(krr_params, "krr_model_params", ts)save_with_timestamp(krr_fitted, "krr_fitted_curve", ts)save_with_timestamp(krr_key, "krr_key_tenor_yields", ts)save_with_timestamp(krr_metrics, "krr_fit_metrics", ts)save_with_timestamp(krr_summary, "krr_annual_summary", ts)logger.log_section("KRR 完成")logger.log_param("RMSE 均值 (bp)", krr_metrics["rmse_bp"].mean())logger.log_param("MAE 均值 (bp)", krr_metrics["mae_bp"].mean())# 第6步:NSS(Nelson-Siegel-Svensson)模型TAU1_GRID = np.array([0.25, 0.50, 0.75, 1.00, 1.50, 2.00, 3.00, 5.00])TAU2_GRID = np.array([2.00, 3.00, 5.00, 7.00, 10.00, 15.00, 20.00, 30.00])def nss_design_matrix(t, tau1, tau2):    t = np.asarray(t, dtype=float)    t = np.where(t <= 0, 1e-12, t)    f1 = (1.0 - np.exp(-t / tau1)) / (t / tau1)    f2 = f1 - np.exp(-t / tau1)    f3 = (1.0 - np.exp(-t / tau2)) / (t / tau2) - np.exp(-t / tau2)    return np.column_stack([np.ones_like(t), f1, f2, f3])def fit_nss_one_curve(x, y):    x = np.asarray(x, dtype=float)    y = np.asarray(y, dtype=float)    best = {"ssr": np.inf, "beta": np.array([np.nan]*4), "tau1": np.nan, "tau2": np.nan}    for tau1 in TAU1_GRID:        for tau2 in TAU2_GRID:            if tau2 <= tau1:                continue            X = nss_design_matrix(x, tau1, tau2)            try:                beta, *_ = np.linalg.lstsq(X, y, rcond=None)            except np.linalg.LinAlgError:                continue            resid = y - X @ beta            ssr = float(np.sum(resid ** 2))            if ssr < best["ssr"]:                best = {"ssr": ssr, "beta": beta, "tau1": tau1, "tau2": tau2}    return bestdef run_nss_pipeline(data, sample_days_per_group=None):    fitted_records = []    key_records = []    metric_records = []    param_records = []    summary_records = []    groups = data.groupby(["curve_type", "year"])    for (ctype, year), g in groups:        g = g.sort_values("date").reset_index(drop=True)        if sample_days_per_group is not None:            unique_days = g["date"].unique()            if len(unique_days) > sample_days_per_group:                selected = np.random.choice(unique_days, sample_days_per_group, replace=False)                g = g[g["date"].isin(selected)].reset_index(drop=True)        x_all = g["maturity_years"].values        y_all = g["yield_percent"].values        best = fit_nss_one_curve(x_all, y_all)        beta, tau1, tau2 = best["beta"], best["tau1"], best["tau2"]        logger.log(f"NSS 参数: {ctype} {year} tau1={tau1}, tau2={tau2}, beta0={beta[0]:.4f}")        param_records.append({            "curve_type": ctype, "year": year,            "beta0": beta[0], "beta1": beta[1], "beta2": beta[2], "beta3": beta[3],            "tau1": tau1, "tau2": tau2        })        # 拟合曲线        x_fine = np.linspace(x_all.min(), x_all.max(), 200)        X_fine = nss_design_matrix(x_fine, tau1, tau2)        y_fine = X_fine @ beta        fitted_records.append(pd.DataFrame({            "curve_type": ctype, "year": year,            "maturity_years": x_fine, "fitted_yield": y_fine        }))        # 关键期限        X_key = nss_design_matrix(KEY_TENORS, tau1, tau2)        y_key = X_key @ beta        key_records.append(pd.DataFrame({            "curve_type": ctype, "year": year,            "maturity_years": KEY_TENORS, "key_yield": y_key,            "tenor_label": [tenor_label(t) for t in KEY_TENORS]        }))        # 误差        X_all = nss_design_matrix(x_all, tau1, tau2)        y_pred = X_all @ beta        resid = y_all - y_pred        metric_records.append({            "curve_type": ctype, "year": year,            "rmse_bp": np.sqrt(np.mean(resid**2)) * 100,            "mae_bp": np.mean(np.abs(resid)) * 100,            "max_err_bp": np.max(np.abs(resid)) * 100,            "r2": 1 - np.sum(resid**2) / np.sum((y_all - np.mean(y_all))**2)        })        summary_records.append({            "curve_type": ctype, "year": year,            "n_obs": len(g),            "mean_yield": np.mean(y_all),            "std_yield": np.std(y_all),            "min_yield": np.min(y_all),            "max_yield": np.max(y_all)        })    fitted_df = pd.concat(fitted_records, ignore_index=True)    key_df = pd.concat(key_records, ignore_index=True)    metric_df = pd.DataFrame(metric_records)    param_df = pd.DataFrame(param_records)    summary_df = pd.DataFrame(summary_records)    return fitted_df, key_df, metric_df, param_df, summary_df# 运行 NSS —— 同样使用 sample_days_per_group 控制数据量logger.log_section("开始 NSS 模型")nss_fitted, nss_key, nss_metrics, nss_params, nss_summary = run_nss_pipeline(    data, sample_days_per_group=80  # <-- 与 KRR 保持一致,或改为 None 使用全部数据)# 保存 NSS 结果save_with_timestamp(nss_fitted, "nss_fitted_curve", ts)save_with_timestamp(nss_key, "nss_key_tenor_yields", ts)save_with_timestamp(nss_metrics, "nss_fit_metrics", ts)save_with_timestamp(nss_params, "nss_params", ts)save_with_timestamp(nss_summary, "nss_annual_summary", ts)logger.log_section("NSS 完成")logger.log_param("RMSE 均值 (bp)", nss_metrics["rmse_bp"].mean())logger.log_param("MAE 均值 (bp)", nss_metrics["mae_bp"].mean())# 第7步:KRR vs NSS 对比分析krr_metrics["model"] = "KRR"nss_metrics["model"] = "NSS"compare = pd.concat([krr_metrics, nss_metrics], ignore_index=True)logger.log_section("模型对比")comparison_table = compare.groupby(["year", "curve_type", "model"])[["rmse_bp", "mae_bp", "r2"]].mean()print(comparison_table)# 保存对比结果save_with_timestamp(compare, "model_comparison", ts)# 按曲线类型汇总logger.log("\n按曲线类型汇总:")summary_by_type = compare.groupby(["curve_type", "model"])[["rmse_bp", "mae_bp", "r2"]].mean()print(summary_by_type)# 第8步:可视化对比fig, axes = plt.subplots(2, 2, figsize=(14, 10))# 图1: RMSE 对比ax1 = axes[0, 0]for model, grp in compare.groupby("model"):    ax1.plot(grp["year"], grp["rmse_bp"], "o-", label=model)ax1.set_xlabel("年份")ax1.set_ylabel("RMSE (bp)")ax1.set_title("RMSE 对比: KRR vs NSS")ax1.legend()ax1.grid(True)# 图2: MAE 对比ax2 = axes[0, 1]for model, grp in compare.groupby("model"):    ax2.plot(grp["year"], grp["mae_bp"], "o-", label=model)ax2.set_xlabel("年份")ax2.set_ylabel("MAE (bp)")ax2.set_title("MAE 对比: KRR vs NSS")ax2.legend()ax2.grid(True)# 图3: R2 对比ax3 = axes[1, 0]for model, grp in compare.groupby("model"):    ax3.plot(grp["year"], grp["r2"], "o-", label=model)ax3.set_xlabel("年份")ax3.set_ylabel("R²")ax3.set_title("R² 对比: KRR vs NSS")ax3.legend()ax3.grid(True)# 图4: 按曲线类型分组的 RMSEax4 = axes[1, 1]pivot = compare.pivot_table(values="rmse_bp", index=["year", "curve_type"], columns="model")pivot.plot(kind="bar", ax=ax4)ax4.set_xlabel("(年份, 曲线类型)")ax4.set_ylabel("RMSE (bp)")ax4.set_title("按曲线类型 RMSE 对比")ax4.legend(title="模型")ax4.tick_params(axis="x", rotation=45)plt.tight_layout()plot_file = f"krr_nss_comparison_{ts}.png"plt.savefig(plot_file, dpi=150, bbox_inches="tight")plt.show()logger.log(f"对比图已保存: {plot_file}")# 第9步:收益率曲线拟合效果图fig, axes = plt.subplots(2, 3, figsize=(16, 10))years = sorted(data["year"].unique())for idx, year in enumerate(years[:6]):    ax = axes[idx // 3, idx % 3]    year_data = data[data["year"] == year]        for ctype in ["AAA", "中债曲线"]:        g = year_data[year_data["curve_type"] == ctype]        if g.empty:            continue        ax.scatter(g["maturity_years"], g["yield_percent"], alpha=0.3, s=10, label=f"{ctype} 原始")                # KRR 拟合        krr_line = krr_fitted[(krr_fitted["year"] == year) & (krr_fitted["curve_type"] == ctype)]        if not krr_line.empty:            ax.plot(krr_line["maturity_years"], krr_line["fitted_yield"], "--", lw=2, label=f"{ctype} KRR")                # NSS 拟合        nss_line = nss_fitted[(nss_fitted["year"] == year) & (nss_fitted["curve_type"] == ctype)]        if not nss_line.empty:            ax.plot(nss_line["maturity_years"], nss_line["fitted_yield"], "-", lw=2, label=f"{ctype} NSS")        ax.set_xlabel("期限 (年)")    ax.set_ylabel("收益率 (%)")    ax.set_title(f"{year}年 收益率曲线")    ax.legend(fontsize=8)    ax.grid(True)plt.tight_layout()curve_file = f"yield_curves_{ts}.png"plt.savefig(curve_file, dpi=150, bbox_inches="tight")plt.show()logger.log(f"收益率曲线图已保存: {curve_file}")# 第10步:完成日志logger.finish()print("\n=== 所有输出文件 ===")import osfor f in sorted(os.listdir(".")):    if ts in f:        print(f"  {f}")