#复现代码
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold, GridSearchCV, cross_val_score
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPRegressor
import warnings
import logging
import sys
from datetime import datetime
import os
warnings.filterwarnings('ignore')
# ============================================================
# 0. 设置日志记录
# ============================================================
def setup_logging():
"""设置日志记录:同时输出到控制台和文件"""
# 创建日志目录
log_dir = 'logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 生成日志文件名(包含时间戳)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = os.path.join(log_dir, f'run_log_{timestamp}.log')
# 配置logger
logger = logging.getLogger('ANN_Model')
logger.setLevel(logging.INFO)
# 清除已有的handler,避免重复
if logger.handlers:
logger.handlers.clear()
# 创建文件handler(写入日志文件)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# 创建控制台handler(输出到屏幕)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
# 设置日志格式
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加handler到logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 也记录到全局(方便print也能被捕获)
logger.info(f"日志文件: {log_file}")
logger.info(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("="*80)
return logger, log_file
# 初始化日志
logger, log_file = setup_logging()
# ============================================================
# 1. 读取数据(处理编码问题)
# ============================================================
import os
_ds_dir = '/kaggle/input/chenxinting-credit-data/'
if os.path.isdir(_ds_dir):
_files = os.listdir(_ds_dir)
logger.info(f'Dataset files: {_files}')
# Find the CSV file
_csv_files = [f for f in _files if f.endswith('.csv')]
if _csv_files:
_csv_path = os.path.join(_ds_dir, _csv_files[0])
logger.info(f'Using CSV: {_csv_path}')
else:
_csv_path = None
logger.error(f'No CSV found in {_ds_dir}: {_files}')
else:
_all = os.listdir('/kaggle/input/') if os.path.exists('/kaggle/input/') else []
logger.error(f'Dataset dir not found. Available: {_all}')
_csv_path = None
logger.info("正在读取数据...")
try:
df = pd.read_csv(_csv_path, encoding='utf-8-sig') if _csv_path else (_ for _ in ()).throw(FileNotFoundError('No CSV file found'))
logger.info("使用 utf-8-sig 编码成功")
except:
try:
df = pd.read_csv(_csv_path, encoding='gbk')
logger.info("使用 gbk 编码成功")
except:
df = pd.read_csv(_csv_path, encoding='utf-8')
logger.info("使用 utf-8 编码成功")
logger.info(f"数据形状: {df.shape}")
logger.info(f"数据列数: {len(df.columns)}")
logger.info(f"年份范围: {df['年份'].min()} - {df['年份'].max()}")
# ============================================================
# 2. 数据预处理
# ============================================================
target_col = '未来一年核销的贷款呆账'
if target_col not in df.columns:
logger.error(f"目标列 '{target_col}' 不存在!")
raise ValueError(f"目标列 '{target_col}' 不存在!请检查列名。")
logger.info(f"删除缺失值前数据量: {len(df)}")
df = df.dropna()
logger.info(f"删除缺失值后数据量: {len(df)}")
# ============================================================
# 3. 定义特征列(使用集合去重)
# ============================================================
year_col = '年份'
exclude_cols = [
'银行代码', '银行中文简称', year_col,
'贷款呆账准备年末值',
target_col
]
feature_cols = [c for c in df.columns if c not in exclude_cols]
feature_cols_with_mgmt = list(set(feature_cols + ['贷款呆账准备本年提取']))
logger.info(f"基础特征数量: {len(feature_cols)}")
logger.info(f"包含管理指标特征数量: {len(feature_cols_with_mgmt)}")
# ============================================================
# 4. 标准化器
# ============================================================
scaler = StandardScaler()
# ============================================================
# 5. 获取训练/测试数据
# ============================================================
def get_data(df, train_end_year, test_year, include_mgmt=False):
features = feature_cols_with_mgmt if include_mgmt else feature_cols
train = df[df[year_col] <= train_end_year]
test = df[df[year_col] == test_year]
if len(train) == 0:
raise ValueError(f"训练数据为空!train_end_year={train_end_year}")
if len(test) == 0:
raise ValueError(f"测试数据为空!test_year={test_year}")
X_train = scaler.fit_transform(train[features])
y_train = train[target_col].values
X_test = scaler.transform(test[features])
y_test = test[target_col].values
logger.debug(f" Train: {len(train)} 条, Test: {len(test)} 条, 特征数: {len(features)}")
return X_train, y_train, X_test, y_test
# ============================================================
# 6. 交叉验证评估
# ============================================================
def cv_eval(model, X, y, cv=10):
actual_cv = min(cv, len(y))
if actual_cv < 2:
return np.nan, np.nan
kf = KFold(n_splits=actual_cv, shuffle=True, random_state=42)
try:
mae = -cross_val_score(model, X, y, cv=kf, scoring='neg_mean_absolute_error').mean()
rmse = np.sqrt(-cross_val_score(model, X, y, cv=kf, scoring='neg_mean_squared_error').mean())
except Exception as e:
logger.warning(f"CV计算失败: {e}")
mae, rmse = np.nan, np.nan
return mae, rmse
# ============================================================
# 7. 年份设置
# ============================================================
available_years = sorted(df[year_col].unique())
logger.info(f"数据中可用的年份: {available_years}")
train_years = []
test_years = []
for i in range(len(available_years) - 1):
train_years.append(available_years[i])
test_years.append(available_years[i + 1])
if len(train_years) == 0:
logger.error("数据年份不足,无法进行时间序列预测")
exit()
logger.info(f"训练年份数: {len(train_years)}")
logger.info(f"训练年份: {train_years}")
logger.info(f"测试年份: {test_years}")
# ============================================================
# 8. 结果容器
# ============================================================
cv_results = []
out_results = []
# ============================================================
# 9. 主循环 - 只使用人工神经网络
# ============================================================
logger.info("\n" + "="*60)
logger.info("开始人工神经网络(ANN)模型训练与评估")
logger.info("="*60)
# 记录开始时间
start_time = datetime.now()
logger.info(f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
for idx, (tr, te) in enumerate(zip(train_years, test_years)):
logger.info(f"\n--- 第 {idx+1}/{len(train_years)} 轮: 训练年份 {tr}, 测试年份 {te} ---")
round_start = datetime.now()
try:
X, y, X_te, y_te = get_data(df, tr, te, include_mgmt=False)
X1, y1, X1_te, y1_te = get_data(df, tr, te, include_mgmt=True)
except Exception as e:
logger.error(f"数据获取失败: {e}")
continue
# 9.1 人工神经网络 - 不包含管理指标
logger.info(" 训练ANN(不含管理指标)...")
ann = MLPRegressor(
solver='adam',
max_iter=2000,
random_state=42,
early_stopping=True,
validation_fraction=0.1,
n_iter_no_change=50
)
ann_grid = {
'hidden_layer_sizes': [(50,), (100,), (50, 50), (100, 50), (100, 100)],
'alpha': [0.0001, 0.001, 0.01],
'learning_rate_init': [0.001, 0.01]
}
try:
ann_search = GridSearchCV(
ann, ann_grid, cv=3,
scoring='neg_mean_squared_error',
n_jobs=-1
)
ann_search.fit(X, y)
ann_best = ann_search.best_estimator_
logger.info(f" 最佳参数: {ann_search.best_params_}")
logger.info(f" 最佳得分: {-ann_search.best_score_:.4f}")
mae_ann, rmse_ann = cv_eval(ann_best, X, y)
cv_results.append({
'Year': tr, 'Model': 'ANN', 'Include_Mgmt': False,
'MAE': mae_ann, 'RMSE': rmse_ann,
'Best_Params': str(ann_search.best_params_)
})
ann_best.fit(X, y)
pred = ann_best.predict(X_te)
mae_o = np.mean(np.abs(pred - y_te))
rmse_o = np.sqrt(mean_squared_error(y_te, pred))
out_results.append({
'Train_Year': tr, 'Test_Year': te, 'Model': 'ANN',
'Include_Mgmt': False, 'MAE': mae_o, 'RMSE': rmse_o
})
logger.info(f" 样本外 MAE: {mae_o:.4f}, RMSE: {rmse_o:.4f}")
except Exception as e:
logger.error(f" ANN训练失败: {e}")
# 9.2 人工神经网络 - 包含管理指标
logger.info(" 训练ANN(包含管理指标)...")
try:
ann_search1 = GridSearchCV(
ann, ann_grid, cv=3,
scoring='neg_mean_squared_error',
n_jobs=-1
)
ann_search1.fit(X1, y1)
ann_best1 = ann_search1.best_estimator_
logger.info(f" 最佳参数: {ann_search1.best_params_}")
logger.info(f" 最佳得分: {-ann_search1.best_score_:.4f}")
mae_ann1, rmse_ann1 = cv_eval(ann_best1, X1, y1)
cv_results.append({
'Year': tr, 'Model': 'ANN', 'Include_Mgmt': True,
'MAE': mae_ann1, 'RMSE': rmse_ann1,
'Best_Params': str(ann_search1.best_params_)
})
ann_best1.fit(X1, y1)
pred1 = ann_best1.predict(X1_te)
mae_o1 = np.mean(np.abs(pred1 - y1_te))
rmse_o1 = np.sqrt(mean_squared_error(y1_te, pred1))
out_results.append({
'Train_Year': tr, 'Test_Year': te, 'Model': 'ANN',
'Include_Mgmt': True, 'MAE': mae_o1, 'RMSE': rmse_o1
})
logger.info(f" 样本外 MAE: {mae_o1:.4f}, RMSE: {rmse_o1:.4f}")
except Exception as e:
logger.error(f" ANN训练失败: {e}")
# 记录本轮耗时
round_end = datetime.now()
logger.info(f" 本轮耗时: {(round_end - round_start).total_seconds():.2f} 秒")
# ============================================================
# 10. 输出结果
# ============================================================
logger.info("\n" + "="*60)
logger.info("结果汇总")
logger.info("="*60)
cv_df = pd.DataFrame(cv_results)
out_df = pd.DataFrame(out_results)
if len(cv_df) > 0:
logger.info("\n===== 交叉验证结果 =====")
logger.info("\n" + cv_df[['Year', 'Include_Mgmt', 'MAE', 'RMSE']].to_string())
else:
logger.warning("\n⚠️ 交叉验证结果为空")
if len(out_df) > 0:
logger.info("\n===== 样本外测试结果 =====")
logger.info("\n" + out_df[['Train_Year', 'Test_Year', 'Include_Mgmt', 'MAE', 'RMSE']].to_string())
else:
logger.warning("\n⚠️ 样本外测试结果为空")
# 保存结果
try:
cv_df.to_excel("ann_cv_results.xlsx", index=False)
out_df.to_excel("ann_out_results.xlsx", index=False)
logger.info("\n✅ 结果已保存到 ann_cv_results.xlsx 和 ann_out_results.xlsx")
except Exception as e:
logger.warning(f"保存Excel失败: {e}")
cv_df.to_csv("ann_cv_results.csv", index=False)
out_df.to_csv("ann_out_results.csv", index=False)
logger.info("✅ 结果已保存到 ann_cv_results.csv 和 ann_out_results.csv")
# ============================================================
# 11. 显示统计摘要
# ============================================================
logger.info("\n===== 统计摘要 =====")
logger.info(f"总共完成 {len(cv_results)} 条交叉验证记录")
logger.info(f"总共完成 {len(out_results)} 条样本外测试记录")
if len(cv_results) > 0:
logger.info("\n--- 交叉验证平均性能 ---")
summary_cv = cv_df.groupby('Include_Mgmt').agg({
'MAE': ['mean', 'std'],
'RMSE': ['mean', 'std']
}).round(2)
logger.info("\n" + str(summary_cv))
if len(out_results) > 0:
logger.info("\n--- 样本外测试平均性能 ---")
summary_out = out_df.groupby('Include_Mgmt').agg({
'MAE': ['mean', 'std'],
'RMSE': ['mean', 'std']
}).round(2)
logger.info("\n" + str(summary_out))
# ============================================================
# 12. 记录运行结束信息
# ============================================================
end_time = datetime.now()
total_time = (end_time - start_time).total_seconds()
logger.info("\n" + "="*80)
logger.info(f"运行结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"总耗时: {total_time:.2f} 秒 ({total_time/60:.2f} 分钟)")
logger.info(f"日志文件: {log_file}")
logger.info("="*80)
print(f"\n✅ 运行完成!日志已保存到: {log_file}")