27223121 python

import os, shutil, glob

# === Kaggle: copy ALL files from /kaggle/src/ to CWD first ===
src_dir = '/kaggle/src'
if os.path.exists(src_dir):
    for f in glob.glob(os.path.join(src_dir, '*')):
        if os.path.isfile(f):
            dest = os.path.basename(f)
            if not os.path.exists(dest):
                shutil.copy2(f, dest)
                print(f"[setup] Copied {dest} -> CWD")
    print(f"[setup] CWD files: {os.listdir('.')}")
else:
    print(f"[setup] /kaggle/src/ does not exist")

# === Diagnostic: list all mount paths ===
print("\n=== Kaggle input directory listing ===")
for base in ['/kaggle/input']:
    if os.path.exists(base):
        for d in sorted(os.listdir(base)):
            full = os.path.join(base, d)
            print(f"  {d} (dir={os.path.isdir(full)})")
            if os.path.isdir(full):
                for f in sorted(os.listdir(full)):
                    fpath = os.path.join(full, f)
                    sz = os.path.getsize(fpath) if os.path.isfile(fpath) else 0
                    print(f"    {f} ({sz} bytes)")
    else:
        print(f"  {base} does NOT exist")
print("=== End listing ===\n")

import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import sys
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, roc_auc_score, recall_score, precision_score, f1_score

# --- 调试信息:打印当前工作目录和文件是否存在 ---
print(f"[调试] 当前脚本位置: {os.path.abspath(__file__)}")
print(f"[调试] 当前工作目录: {os.getcwd()}")
print(f"[调试] CWD文件列表: {os.listdir('.')}")

# Kaggle: code files in /kaggle/src/, dataset in /kaggle/input/
# Step 1: Search known paths
file_path = None
candidates = [
    '虚拟数据.xlsx',
    '/kaggle/src/虚拟数据.xlsx',
    '/kaggle/input/final-data-27223121/虚拟数据.xlsx',
    '/kaggle/input/final-data-27223121/.xlsx',
    '.xlsx',
]
for c in candidates:
    if os.path.exists(c):
        file_path = c
        print(f"[setup] Found file: {file_path}")
        break

# Step 2: If not found, search recursively under /kaggle/input for ANY .xlsx
if file_path is None:
    print("[setup] Known paths failed. Searching /kaggle/input recursively...")
    for root, dirs, files in os.walk('/kaggle/input'):
        for f in files:
            if f.endswith('.xlsx') or f.endswith('.csv'):
                fp = os.path.join(root, f)
                print(f"[setup] Found: {fp} ({os.path.getsize(fp)} bytes)")
                if file_path is None:
                    file_path = fp

# Step 3: If still not found, download from Kaggle API
if file_path is None:
    print("[setup] No data file found. Downloading from Kaggle API...")
    import subprocess
    try:
        subprocess.run([
            'kaggle', 'datasets', 'download',
            'songsammy/final-data-27223121',
            '-p', '.', '--unzip'
        ], check=True, capture_output=True, text=True)
        # Search for downloaded files
        for f in os.listdir('.'):
            if f.endswith('.xlsx') or f.endswith('.csv'):
                file_path = f
                print(f"[setup] Downloaded and found: {file_path}")
                break
        if file_path is None:
            for f in glob.glob('**/*.xlsx', recursive=True):
                file_path = f
                print(f"[setup] Downloaded and found: {file_path}")
                break
    except Exception as e:
        print(f"[setup] Download failed: {e}")

if file_path is None:
    print(f"[错误] 找不到任何数据文件。")
    sys.exit(1)
else:
    print(f"[调试] 文件检查通过,状态: {os.stat(file_path)}")

# --- 设置中文字体 ---
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# ================== 1. 数据读取与预处理 ==================
def load_and_prepare_data(file_path='虚拟数据.xlsx'):
    """
    读取用户上传的Excel文件,根据'label' sheet进行列名映射,
    并处理时间和地区列,最终构造出符合模型训练格式的数据集。
    """
    print(f"正在读取数据文件: {file_path}")
    
    try:
        # 1. 读取数据sheet和标签映射sheet
        # 注意:Excel中Sheet名可能有大小写或空格,这里强制指定
        sheet_names = ['CEIData', 'label']
        
        # 检查Excel中实际包含的Sheet名
        excel_file = pd.ExcelFile(file_path)
        print(f"[调试] Excel中包含的Sheet列表: {excel_file.sheet_names}")
        
        df_raw = pd.read_excel(file_path, sheet_name='CEIData') # 主数据
        df_label_map = pd.read_excel(file_path, sheet_name='label') # 映射表
        
    except Exception as e:
        print(f"[读取错误] 无法读取Excel文件或Sheet页,请检查Sheet名称是否为 'CEIData' 和 'label'。详细错误: {e}")
        sys.exit(1)

    print(f"[调试] 成功读取数据,主数据形状: {df_raw.shape}, 标签映射形状: {df_label_map.shape}")
    
    # 2. 构建列名映射字典 (变量名 -> 变量标签)
    # 确保列名正确,防止因Excel格式问题导致映射失败
    col_mapping = dict(zip(df_label_map['变量名'], df_label_map['变量标签']))
    print(f"[调试] 成功构建映射字典,共 {len(col_mapping)} 个字段。")
    
    # 3. 重命名主数据列
    try:
        df = df_raw.rename(columns=col_mapping)
    except Exception as e:
        print(f"[映射错误] 列名映射失败。可能是Excel列名与映射表不匹配。错误: {e}")
        print(f"[当前列名] 主数据中的列: {list(df_raw.columns)}")
        sys.exit(1)

    # 4. 处理时间列
    # 根据你的数据,时间格式是 '2025-09' 这种,pandas默认能识别
    df['时间'] = pd.to_datetime(df['时间'], format='%Y-%m', errors='coerce')
    df['年份'] = df['时间'].dt.year

    # 5. 处理地区编码等非特征列
    drop_cols = ['地区编码', '南北方'] # 增加'南北方'因为通常也不作为数值特征
    df = df.drop(columns=[c for c in drop_cols if c in df.columns])

    # 6. 构造标签 (FIFR): 由于原始数据未提供真实舞弊标签,这里模拟论文逻辑生成标签
    # 逻辑: 资产类下降、负债类上升 -> 舞弊概率提高
    # 需要确保分母不为0
    assets_col = '资产总计_期末_亿元'
    debt_col = '负债合计_期末_亿元'
    
    # 防止除以0
    total_assets = df[assets_col] + 1e-4
    df['资产负债率'] = df[debt_col] / total_assets

    # 模拟生成 FIFR 标签 (0: 正常, 1: 舞弊)
    np.random.seed(42)
    # 基于资产负债率和随机噪声生成概率
    prob_fraud = 1 / (1 + np.exp(-(-5 + 10 * df['资产负债率'] + np.random.normal(0, 0.1, len(df)))))
    df['FIFR'] = np.random.binomial(1, prob_fraud, len(df))

    print(f"数据读取与预处理完成,最终样本量: {len(df)}")
    print(f"舞弊样本(FIFR=1)占比: {df['FIFR'].mean():.2%}")
    
    return df

# ================== 2. 数据预处理与特征工程 ==================
def preprocess_data(df):
    """
    复现论文中的数据处理步骤:
    1. 数据清洗 (处理缺失值)
    2. 正逆向化 (Reverse Index Processing)
    3. 标准化 (Z-score)
    """
    print("开始特征工程与标准化...")
    
    # --- 论文步骤 1 & 2: 数据清洗与正逆向化 ---
    # 假设 '资产负债率' 是逆向指标 (越小越好), 需要倒数处理
    if '资产负债率' in df.columns:
        df['debt_ratio_inv'] = 1 / (df['资产负债率'] + 1e-4)

    # 仅保留数值型特征进行标准化
    exclude_cols = ['FIFR', '地区', '时间', '年份']
    features = [col for col in df.columns if col not in exclude_cols and df[col].dtype in ['float64', 'int64']]
    
    print(f"用于建模的特征数量: {len(features)}")
    print(f"特征示例: {features[:5]} ...")

    # 处理缺失值 (如果有)
    df[features] = df[features].fillna(df[features].mean())

    # 标准化
    scaler = StandardScaler()
    df_scaled = scaler.fit_transform(df[features])
    df_processed = pd.DataFrame(df_scaled, columns=features)
    df_processed['FIFR'] = df['FIFR'].values

    return df_processed

# ================== 3. 模型训练与评估 ==================
def train_and_evaluate(X_train, X_test, y_train, y_test):
    """
    训练模型,并进行对比。
    """
    print("开始训练模型...")
    
    # 这里为了简化依赖,移除了 SMOTE 和 LightGBM/XGBoost (如果你没有安装会报错)
    # 使用简单的逻辑回归作为演示,或者你可以安装 imblearn 和 lightgbm
    try:
        from imblearn.over_sampling import SMOTE
        from lightgbm import LGBMClassifier
        use_advanced = True
        print("检测到高级库,将使用 SMOTE 和 LightGBM。")
    except ImportError as e:
        print(f"警告: 缺少必要的库 {e},将使用基础逻辑回归进行演示。")
        use_advanced = False

    # 处理样本不平衡 (如果可用)
    if use_advanced:
        smote = SMOTE(random_state=42)
        X_train_balanced, y_train_balanced = smote.fit_resample(X_train, y_train)
        print(f"SMOTE过采样后,训练集样本量: {X_train_balanced.shape[0]} (原样本: {X_train.shape[0]})")
    else:
        X_train_balanced, y_train_balanced = X_train, y_train

    # --- 模型初始化 ---
    if use_advanced:
        models = {
            "LightGBM": LGBMClassifier(random_state=42, n_estimators=100),
            "LogisticRegression": LogisticRegression(max_iter=1000, random_state=42)
        }
    else:
        # 仅使用 sklearn 基础模型
        models = {
            "LogisticRegression": LogisticRegression(max_iter=1000, random_state=42)
        }

    results = {}

    for name, model in models.items():
        print(f"\n正在训练模型: {name} ...")
        
        # 训练
        model.fit(X_train_balanced, y_train_balanced)
        
        # 预测
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1]

        # --- 论文评估指标计算 ---
        accuracy = accuracy_score(y_test, y_pred)
        auc = roc_auc_score(y_test, y_pred_proba)
        recall = recall_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)

        # KS值计算
        from scipy import stats
        good_probs = y_pred_proba[y_test == 0]
        bad_probs = y_pred_proba[y_test == 1]
        ks_stat, _ = stats.ks_2samp(good_probs, bad_probs)

        results[name] = {
            'model': model,
            'accuracy': accuracy,
            'auc': auc,
            'recall': recall,
            'precision': precision,
            'f1': f1,
            'ks': ks_stat
        }

        print(f"{name} - 准确率: {accuracy:.3f}, AUC: {auc:.3f}, 召回率: {recall:.3f}, KS: {ks_stat:.3f}")

    return results

# ================== 4. 主函数 ==================
def main():
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    print(f"当前运行时间戳: {timestamp}\n")
    print("="*60)
    print("上市公司财务舞弊预测模型 (虚拟数据测试)")
    print("="*60)

    # 1. 读取并准备数据
    # 注意:这里的路径是相对于你运行脚本的位置 (file_path由上方setup自动解析)
    df = load_and_prepare_data(file_path)

    # 2. 预处理
    df_processed = preprocess_data(df)

    # 3. 划分数据集
    X = df_processed.drop('FIFR', axis=1)
    y = df_processed['FIFR']

    # 如果特征太少,防止报错
    if X.shape[1] == 0:
        print("错误: 没有找到有效的数值特征列,请检查Excel列名映射是否正确。")
        return

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    # 4. 训练与评估
    results = train_and_evaluate(X_train, X_test, y_train, y_test)

    # 5. 结果展示
    print("\n" + "="*50)
    print(f"模型性能对比 | 运行时间戳: {timestamp}")
    print("="*50)

    comparison_df = pd.DataFrame({
        model: {
            'Accuracy': f"{results[model]['accuracy']:.3f}",
            'AUC': f"{results[model]['auc']:.3f}",
            'Recall': f"{results[model]['recall']:.3f}",
            'Precision': f"{results[model]['precision']:.3f}",
            'KS': f"{results[model]['ks']:.3f}"
        } for model in results
    })

    print(comparison_df.T)

    # --- 可视化 ---
    plt.figure(figsize=(12, 6))
    metric_names = list(results.values())[0].keys()
    metrics_to_plot = [m for m in metric_names if m not in ['model', 'ks']]

    for i, metric in enumerate(metrics_to_plot):
        plt.subplot(2, 3, i+1)
        values = [results[model][metric] for model in results]
        plt.bar(results.keys(), values)
        plt.title(metric)
        plt.ylim(0, 1)

    plt.tight_layout()
    plt.savefig(f'model_results_{timestamp}.png')
    print(f"\n图表已保存为 model_results_{timestamp}.png")

if __name__ == "__main__":
    main()