In [1]:
# 第0步:安装依赖(JupyterLite 需要先安装 openpyxl)import micropipawait micropip.install("openpyxl")print("openpyxl 安装完成")# 第1步:导入库import warningsimport datetimeimport timefrom pathlib import Pathimport reimport numpy as npimport pandas as pdimport matplotlib.pyplot as pltwarnings.filterwarnings("ignore")plt.rcParams["axes.unicode_minus"] = Falsefor font in ["Noto Sans CJK SC", "WenQuanYi Micro Hei", "SimHei", "Microsoft YaHei"]: try: plt.rcParams["font.sans-serif"] = [font] break except Exception: continueprint("库导入完成")# 第2步:时间戳与日志工具def get_timestamp(): return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")def get_readable_time(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")class RunLogger: def __init__(self, prefix="run"): self.timestamp = get_timestamp() self.readable_time = get_readable_time() self.log_file = f"{prefix}_log_{self.timestamp}.txt" self.lines = [] self.log("=" * 60) self.log("运行日志") self.log(f"开始时间: {self.readable_time}") self.log(f"时间戳: {self.timestamp}") self.log("=" * 60) def log(self, message): self.lines.append(message) print(message) def log_section(self, title): self.log("") self.log(f"【{title}】") self.log("-" * 40) def log_param(self, name, value): self.log(f" {name}: {value}") def log_result(self, name, value): self.log(f" {name}: {value}") def finish(self): self.log("") self.log("=" * 60) self.log("运行结束") self.log(f"总耗时: {elapsed:.2f} 秒") self.log("=" * 60) with open(self.log_file, "w", encoding="utf-8") as f: f.write("\n".join(self.lines)) print(f"\n日志已保存: {self.log_file}") return self.log_filedef save_with_timestamp(df, prefix, timestamp, suffix="csv"): filename = f"{prefix}_{timestamp}.{suffix}" df.to_csv(filename, index=False, encoding="utf-8-sig") return filename# 初始化日志ts = get_timestamp()logger = RunLogger(prefix="yield_curve")logger.log_section("初始化")logger.log_param("时间戳", ts)logger.log_param("环境", "JupyterLite / Pyodide")# 第3步:全局配置KEY_TENORS = np.array([ 0.02, 0.04, 0.08, 0.17, 0.25, 0.50, 0.75, 1.0, 2.0, 3.0, 5.0, 7.0, 10.0, 15.0, 20.0, 30.0, 50.0])def classify_curve_type(file_name): name = str(file_name) if "AAA" in name.upper(): return "AAA" if "中债曲线" in name: return "中债曲线" return Nonedef tenor_label(x): x = float(x) if abs(x - 0.02) < 0.006: return "7D" if abs(x - 0.04) < 0.006: return "14D" if x < 1: return f"{round(x * 12)}M" if abs(x - round(x)) < 1e-8: return f"{int(round(x))}Y" return f"{x:.2f}Y"def extract_file_year(file_name): match = re.search(r"(20\d{2})", str(file_name)) return int(match.group(1)) if match else Nonelogger.log_section("配置参数")logger.log_param("关键期限", [tenor_label(t) for t in KEY_TENORS])# 第4步:读取数据def read_yield_excel(path): curve_type = classify_curve_type(path.name) if curve_type is None: return pd.DataFrame() xls = pd.ExcelFile(path) sheet = xls.sheet_names[0] df = pd.read_excel(path, sheet_name=sheet) df.columns = [str(c).strip() for c in df.columns] rename = { "日期": "date", "标准期限说明": "tenor_text", "标准期限(年)": "maturity_years", "收益率(%)": "yield_percent", "收益率": "yield_percent", } df = df.rename(columns={c: rename.get(c, c) for c in df.columns}) required = ["date", "maturity_years", "yield_percent"] for col in required: if col not in df.columns: return pd.DataFrame() df = df[required].copy() df["date"] = pd.to_datetime(df["date"], errors="coerce") df["maturity_years"] = pd.to_numeric(df["maturity_years"], errors="coerce") df["yield_percent"] = pd.to_numeric(df["yield_percent"], errors="coerce") df = df.dropna(subset=required) df["curve_type"] = curve_type df["year"] = df["date"].dt.year return df# 读取所有 Excel 文件files = sorted(Path("/kaggle/input/datasets/songsammy/yangrui-bond-curves").glob("*.xlsx"))all_data = []for f in files: df = read_yield_excel(f) if not df.empty: all_data.append(df) logger.log(f"读取: {f.name}")data = pd.concat(all_data, ignore_index=True)data = data[(data["maturity_years"] > 0) & (data["yield_percent"] > 0)]data = data.sort_values(["curve_type", "year", "date", "maturity_years"]).reset_index(drop=True)logger.log_section("数据加载完成")logger.log_param("总行数", len(data))logger.log_param("曲线类型", data["curve_type"].unique().tolist())logger.log_param("年份", sorted(data["year"].unique().tolist()))print(data.groupby(["curve_type", "year"]).size())# 第5步:KRR(核岭回归)模型def rbf_kernel(X1, X2, gamma): X1 = np.asarray(X1, dtype=float) X2 = np.asarray(X2, dtype=float) if X1.ndim == 1: X1 = X1.reshape(-1, 1) if X2.ndim == 1: X2 = X2.reshape(-1, 1) sq_dists = (np.sum(X1**2, axis=1).reshape(-1, 1) + np.sum(X2**2, axis=1) - 2 * np.dot(X1, X2.T)) return np.exp(-gamma * sq_dists)def krr_fit(X_train, y_train, gamma, alpha): K = rbf_kernel(X_train, X_train, gamma) n = K.shape[0] A = K + alpha * np.eye(n) return np.linalg.solve(A, y_train)def krr_predict(X_train, alpha_coef, X_test, gamma): K_s = rbf_kernel(X_test, X_train, gamma) return K_s.dot(alpha_coef)def cv_rmse_krr(X, y, gamma, alpha, n_splits=5): n = len(y) if n < n_splits: return np.nan fold_size = n // n_splits errs = [] for i in range(n_splits): start = i * fold_size end = (i + 1) * fold_size if i < n_splits - 1 else n test_idx = list(range(start, end)) train_idx = [j for j in range(n) if j not in test_idx] if len(train_idx) < 2: continue alpha_coef = krr_fit(X[train_idx], y[train_idx], gamma, alpha) y_pred = krr_predict(X[train_idx], alpha_coef, X[test_idx], gamma) errs.append(np.mean((y[test_idx] - y_pred) ** 2)) return np.sqrt(np.mean(errs)) if errs else np.nandef run_krr_pipeline(data, sample_days_per_group=None): cleaned_records = [] param_records = [] fitted_records = [] key_records = [] metric_records = [] summary_records = [] groups = data.groupby(["curve_type", "year"]) for (ctype, year), g in groups: g = g.sort_values("date").reset_index(drop=True) if sample_days_per_group is not None: unique_days = g["date"].unique() if len(unique_days) > sample_days_per_group: selected = np.random.choice(unique_days, sample_days_per_group, replace=False) g = g[g["date"].isin(selected)].reset_index(drop=True) cleaned_records.append(g.copy()) X_all = g["maturity_years"].values.reshape(-1, 1) y_all = g["yield_percent"].values # 超参数搜索 alpha_grid = [1e-5, 3e-5, 1e-4, 3e-4, 1e-3, 3e-3, 1e-2] gamma_grid = [0.01, 0.03, 0.05, 0.1, 0.2, 0.5, 1.0] best_cv = np.inf best_alpha, best_gamma = 1e-3, 0.1 for alpha in alpha_grid: for gamma in gamma_grid: cv = cv_rmse_krr(X_all, y_all, gamma, alpha, n_splits=5) if not np.isnan(cv) and cv < best_cv: best_cv = cv best_alpha, best_gamma = alpha, gamma logger.log(f"KRR 参数选择: {ctype} {year} alpha={best_alpha}, gamma={best_gamma}, CV_RMSE={best_cv:.2f}bp") param_records.append({ "curve_type": ctype, "year": year, "alpha": best_alpha, "gamma": best_gamma, "cv_rmse_bp": best_cv }) # 拟合整条曲线 alpha_coef = krr_fit(X_all, y_all, best_gamma, best_alpha) x_fine = np.linspace(X_all.min(), X_all.max(), 200) y_fine = krr_predict(X_all, alpha_coef, x_fine.reshape(-1, 1), best_gamma) fitted_records.append(pd.DataFrame({ "curve_type": ctype, "year": year, "maturity_years": x_fine, "fitted_yield": y_fine })) # 关键期限收益率 key_y = krr_predict(X_all, alpha_coef, KEY_TENORS.reshape(-1, 1), best_gamma) key_records.append(pd.DataFrame({ "curve_type": ctype, "year": year, "maturity_years": KEY_TENORS, "key_yield": key_y, "tenor_label": [tenor_label(t) for t in KEY_TENORS] })) # 误差指标 y_pred = krr_predict(X_all, alpha_coef, X_all, best_gamma) resid = y_all - y_pred metric_records.append({ "curve_type": ctype, "year": year, "rmse_bp": np.sqrt(np.mean(resid**2)) * 100, "mae_bp": np.mean(np.abs(resid)) * 100, "max_err_bp": np.max(np.abs(resid)) * 100, "r2": 1 - np.sum(resid**2) / np.sum((y_all - np.mean(y_all))**2) }) # 年度统计 summary_records.append({ "curve_type": ctype, "year": year, "n_obs": len(g), "mean_yield": np.mean(y_all), "std_yield": np.std(y_all), "min_yield": np.min(y_all), "max_yield": np.max(y_all) }) cleaned_df = pd.concat(cleaned_records, ignore_index=True) param_df = pd.DataFrame(param_records) fitted_df = pd.concat(fitted_records, ignore_index=True) key_df = pd.concat(key_records, ignore_index=True) metric_df = pd.DataFrame(metric_records) summary_df = pd.DataFrame(summary_records) return cleaned_df, param_df, fitted_df, key_df, metric_df, summary_df# 运行 KRR —— 使用 sample_days_per_group 控制每组采样天数# 设为 80 天与第一次代码效果类似,设为 None 则使用全部数据logger.log_section("开始 KRR 模型")logger.log("提示: 如运行过慢,可调整 sample_days_per_group 参数(如 80)减少数据量")krr_cleaned, krr_params, krr_fitted, krr_key, krr_metrics, krr_summary = run_krr_pipeline( data, sample_days_per_group=80 # <-- 改为 None 使用全部数据,或调整天数)# 保存 KRR 结果save_with_timestamp(krr_cleaned, "krr_cleaned_data", ts)save_with_timestamp(krr_params, "krr_model_params", ts)save_with_timestamp(krr_fitted, "krr_fitted_curve", ts)save_with_timestamp(krr_key, "krr_key_tenor_yields", ts)save_with_timestamp(krr_metrics, "krr_fit_metrics", ts)save_with_timestamp(krr_summary, "krr_annual_summary", ts)logger.log_section("KRR 完成")logger.log_param("RMSE 均值 (bp)", krr_metrics["rmse_bp"].mean())logger.log_param("MAE 均值 (bp)", krr_metrics["mae_bp"].mean())# 第6步:NSS(Nelson-Siegel-Svensson)模型TAU1_GRID = np.array([0.25, 0.50, 0.75, 1.00, 1.50, 2.00, 3.00, 5.00])TAU2_GRID = np.array([2.00, 3.00, 5.00, 7.00, 10.00, 15.00, 20.00, 30.00])def nss_design_matrix(t, tau1, tau2): t = np.asarray(t, dtype=float) t = np.where(t <= 0, 1e-12, t) f1 = (1.0 - np.exp(-t / tau1)) / (t / tau1) f2 = f1 - np.exp(-t / tau1) f3 = (1.0 - np.exp(-t / tau2)) / (t / tau2) - np.exp(-t / tau2) return np.column_stack([np.ones_like(t), f1, f2, f3])def fit_nss_one_curve(x, y): x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) best = {"ssr": np.inf, "beta": np.array([np.nan]*4), "tau1": np.nan, "tau2": np.nan} for tau1 in TAU1_GRID: for tau2 in TAU2_GRID: if tau2 <= tau1: continue X = nss_design_matrix(x, tau1, tau2) try: beta, *_ = np.linalg.lstsq(X, y, rcond=None) except np.linalg.LinAlgError: continue resid = y - X @ beta ssr = float(np.sum(resid ** 2)) if ssr < best["ssr"]: best = {"ssr": ssr, "beta": beta, "tau1": tau1, "tau2": tau2} return bestdef run_nss_pipeline(data, sample_days_per_group=None): fitted_records = [] key_records = [] metric_records = [] param_records = [] summary_records = [] groups = data.groupby(["curve_type", "year"]) for (ctype, year), g in groups: g = g.sort_values("date").reset_index(drop=True) if sample_days_per_group is not None: unique_days = g["date"].unique() if len(unique_days) > sample_days_per_group: selected = np.random.choice(unique_days, sample_days_per_group, replace=False) g = g[g["date"].isin(selected)].reset_index(drop=True) x_all = g["maturity_years"].values y_all = g["yield_percent"].values best = fit_nss_one_curve(x_all, y_all) beta, tau1, tau2 = best["beta"], best["tau1"], best["tau2"] logger.log(f"NSS 参数: {ctype} {year} tau1={tau1}, tau2={tau2}, beta0={beta[0]:.4f}") param_records.append({ "curve_type": ctype, "year": year, "beta0": beta[0], "beta1": beta[1], "beta2": beta[2], "beta3": beta[3], "tau1": tau1, "tau2": tau2 }) # 拟合曲线 x_fine = np.linspace(x_all.min(), x_all.max(), 200) X_fine = nss_design_matrix(x_fine, tau1, tau2) y_fine = X_fine @ beta fitted_records.append(pd.DataFrame({ "curve_type": ctype, "year": year, "maturity_years": x_fine, "fitted_yield": y_fine })) # 关键期限 X_key = nss_design_matrix(KEY_TENORS, tau1, tau2) y_key = X_key @ beta key_records.append(pd.DataFrame({ "curve_type": ctype, "year": year, "maturity_years": KEY_TENORS, "key_yield": y_key, "tenor_label": [tenor_label(t) for t in KEY_TENORS] })) # 误差 X_all = nss_design_matrix(x_all, tau1, tau2) y_pred = X_all @ beta resid = y_all - y_pred metric_records.append({ "curve_type": ctype, "year": year, "rmse_bp": np.sqrt(np.mean(resid**2)) * 100, "mae_bp": np.mean(np.abs(resid)) * 100, "max_err_bp": np.max(np.abs(resid)) * 100, "r2": 1 - np.sum(resid**2) / np.sum((y_all - np.mean(y_all))**2) }) summary_records.append({ "curve_type": ctype, "year": year, "n_obs": len(g), "mean_yield": np.mean(y_all), "std_yield": np.std(y_all), "min_yield": np.min(y_all), "max_yield": np.max(y_all) }) fitted_df = pd.concat(fitted_records, ignore_index=True) key_df = pd.concat(key_records, ignore_index=True) metric_df = pd.DataFrame(metric_records) param_df = pd.DataFrame(param_records) summary_df = pd.DataFrame(summary_records) return fitted_df, key_df, metric_df, param_df, summary_df# 运行 NSS —— 同样使用 sample_days_per_group 控制数据量logger.log_section("开始 NSS 模型")nss_fitted, nss_key, nss_metrics, nss_params, nss_summary = run_nss_pipeline( data, sample_days_per_group=80 # <-- 与 KRR 保持一致,或改为 None 使用全部数据)# 保存 NSS 结果save_with_timestamp(nss_fitted, "nss_fitted_curve", ts)save_with_timestamp(nss_key, "nss_key_tenor_yields", ts)save_with_timestamp(nss_metrics, "nss_fit_metrics", ts)save_with_timestamp(nss_params, "nss_params", ts)save_with_timestamp(nss_summary, "nss_annual_summary", ts)logger.log_section("NSS 完成")logger.log_param("RMSE 均值 (bp)", nss_metrics["rmse_bp"].mean())logger.log_param("MAE 均值 (bp)", nss_metrics["mae_bp"].mean())# 第7步:KRR vs NSS 对比分析krr_metrics["model"] = "KRR"nss_metrics["model"] = "NSS"compare = pd.concat([krr_metrics, nss_metrics], ignore_index=True)logger.log_section("模型对比")comparison_table = compare.groupby(["year", "curve_type", "model"])[["rmse_bp", "mae_bp", "r2"]].mean()print(comparison_table)# 保存对比结果save_with_timestamp(compare, "model_comparison", ts)# 按曲线类型汇总logger.log("\n按曲线类型汇总:")summary_by_type = compare.groupby(["curve_type", "model"])[["rmse_bp", "mae_bp", "r2"]].mean()print(summary_by_type)# 第8步:可视化对比fig, axes = plt.subplots(2, 2, figsize=(14, 10))# 图1: RMSE 对比ax1 = axes[0, 0]for model, grp in compare.groupby("model"): ax1.plot(grp["year"], grp["rmse_bp"], "o-", label=model)ax1.set_xlabel("年份")ax1.set_ylabel("RMSE (bp)")ax1.set_title("RMSE 对比: KRR vs NSS")ax1.legend()ax1.grid(True)# 图2: MAE 对比ax2 = axes[0, 1]for model, grp in compare.groupby("model"): ax2.plot(grp["year"], grp["mae_bp"], "o-", label=model)ax2.set_xlabel("年份")ax2.set_ylabel("MAE (bp)")ax2.set_title("MAE 对比: KRR vs NSS")ax2.legend()ax2.grid(True)# 图3: R2 对比ax3 = axes[1, 0]for model, grp in compare.groupby("model"): ax3.plot(grp["year"], grp["r2"], "o-", label=model)ax3.set_xlabel("年份")ax3.set_ylabel("R²")ax3.set_title("R² 对比: KRR vs NSS")ax3.legend()ax3.grid(True)# 图4: 按曲线类型分组的 RMSEax4 = axes[1, 1]pivot = compare.pivot_table(values="rmse_bp", index=["year", "curve_type"], columns="model")pivot.plot(kind="bar", ax=ax4)ax4.set_xlabel("(年份, 曲线类型)")ax4.set_ylabel("RMSE (bp)")ax4.set_title("按曲线类型 RMSE 对比")ax4.legend(title="模型")ax4.tick_params(axis="x", rotation=45)plt.tight_layout()plot_file = f"krr_nss_comparison_{ts}.png"plt.savefig(plot_file, dpi=150, bbox_inches="tight")plt.show()logger.log(f"对比图已保存: {plot_file}")# 第9步:收益率曲线拟合效果图fig, axes = plt.subplots(2, 3, figsize=(16, 10))years = sorted(data["year"].unique())for idx, year in enumerate(years[:6]): ax = axes[idx // 3, idx % 3] year_data = data[data["year"] == year] for ctype in ["AAA", "中债曲线"]: g = year_data[year_data["curve_type"] == ctype] if g.empty: continue ax.scatter(g["maturity_years"], g["yield_percent"], alpha=0.3, s=10, label=f"{ctype} 原始") # KRR 拟合 krr_line = krr_fitted[(krr_fitted["year"] == year) & (krr_fitted["curve_type"] == ctype)] if not krr_line.empty: ax.plot(krr_line["maturity_years"], krr_line["fitted_yield"], "--", lw=2, label=f"{ctype} KRR") # NSS 拟合 nss_line = nss_fitted[(nss_fitted["year"] == year) & (nss_fitted["curve_type"] == ctype)] if not nss_line.empty: ax.plot(nss_line["maturity_years"], nss_line["fitted_yield"], "-", lw=2, label=f"{ctype} NSS") ax.set_xlabel("期限 (年)") ax.set_ylabel("收益率 (%)") ax.set_title(f"{year}年 收益率曲线") ax.legend(fontsize=8) ax.grid(True)plt.tight_layout()curve_file = f"yield_curves_{ts}.png"plt.savefig(curve_file, dpi=150, bbox_inches="tight")plt.show()logger.log(f"收益率曲线图已保存: {curve_file}")# 第10步:完成日志logger.finish()print("\n=== 所有输出文件 ===")import osfor f in sorted(os.listdir(".")): if ts in f: print(f" {f}")