某城商行(下称"该行")2024 年线上交易笔数突破3.2亿笔,日均交易金额超 120 亿元。传统的规则引擎(发件方拦截、限额控制)仅能覆盖约 35% 的欺诈场景,剩余绝大多数新型高级欺诈手法——如"跑分"洗钱、加密货币兑换套现、账号劫持后异地登录—— slippess 于传统风控网之外。
核心诉求明确:实时(<300ms)通过 AI 模型判断单笔交易的风险评分,无感化拒绝合法用户,精准拦截灰色/黑色交易。
| 特征类型 | 数量 | 示例字段 | 延迟要求 |
|---|---|---|---|
| 账户基础信息 | 18个 | 开户时长、KYC等级、绑定手机相龄 | <10ms |
| 交易行为序列 | 动态 | 近30天交易频次、金额分布、时段特征 | <50ms |
| 设备指纹 | 42个 | 设备ID、Root概率、模拟器特征、IP归属 | <30ms |
| 知识图谱关联 | 动态 | 关联账户数、是否涉及风险名单 | <100ms |
| 外部黑名单 | 实时 | 公安断卡名单、行业共享风险库 | <20ms |
# 实时特征计算(Flink 流处理示例)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# 定义交易来源 Kafka Topic
t_env.execute_sql("""
CREATE TABLE transaction_stream (
tx_id STRING,
account_id BIGINT,
amount DECIMAL(18,2),
merchant_id STRING,
device_fp STRING,
ip_address STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'live-transactions',
'properties.bootstrap.servers' = 'kafka01:9092,kafka02:9092',
'format' = 'json'
)
""")
# 计算滑动窗口特征(过去24小时交易统计)
# Flink 自动完成滑动窗口聚合
result_table = t_env.sql_query("""
SELECT
account_id,
COUNT(*) OVER w AS tx_count_24h,
AVG(amount) OVER w AS avg_amount_24h,
MAX(amount) OVER w AS max_amount_24h,
-- 时间特征衍生
HOUR(event_time) AS hour_of_day,
DAYOFWEEK(event_time) AS day_of_week
FROM transaction_stream
WINDOW w AS (
PARTITION BY account_id
ORDER BY event_time
RANGE BETWEEN INTERVAL '24' HOUR PRECEDING AND CURRENT ROW
)
""")
采用漏斗式三阶段模型:规则先过滤 → 模型初筛 → 高端模型精判
## 阶段一:规则引擎(拦截明显欺诈)
## - 黑名单命中:直接拦截
## - 异地+异常设备:触发验证
## - 大额转账+新增收款方:人工复核
# 规则引擎延迟:< 5ms
## 阶段二:XGBoost 轻量模型(普筛)
# 特征维度 ≈ 340个(实时可算)
# 输入 → 输出:0-1 风险概率
# P99 延迟:< 30ms
## 阶段三:深度学习模型(精判,仅对阶段二 score∈[0.3,0.7])
# 模型:MMoE + Transformer temporal encoder
# 特征维度:全量 2000+(含 Graph 聚合特征)
# P99 延迟:< 250ms(通过 ONNX 加速推理)
import xgboost as xgb
from sklearn.metrics import roc_auc_score
# 核心:样本不平衡处理(欺诈样本 < 0.1%)
# 使用 focal loss 等价编码:scale_pos_weight
total_samples = 3_200_000
positive_samples = 2_800 # 约 0.0875%
scale_pos_weight = total_samples / positive_samples # ≈ 1142
params = {
"objective": "binary:logistic",
"eval_metric": ["auc", "logloss"],
"max_depth": 8,
"learning_rate": 0.05,
"subsample": 0.8,
"colsample_bytree": 0.7,
"scale_pos_weight": scale_pos_weight,
"tree_method": "hist", # 加速训练
"nthread": 16
}
# 5-fold 时序交叉验证(不能随机split,需按时间划分)
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
model = xgb.train(
params, dtrain,
num_boost_round=500,
evals=[(dtrain, "train"), (dval, "val")],
early_stopping_rounds=50,
verbose_eval=50
)
# AUC 通常可达 0.94-0.96(欺诈检测领域优质结果)
print(f"Validation AUC: {roc_auc_score(y_val, model.predict(dval)):.4f}")
采用影子评分(Shadow Scoring)模式:模型实时计算风险分,但不拦截,只记录日志,持续对比 2 周。
# 监控脚本:每日检测 PSI(Population Stability Index)
import pandas as pd
from scipy.stats import chi2_contingency
def compute_psi(expected, actual, buckets=10):
"""计算特征分布的 PSI,判断模型漂移"""
breakpoints = pd.qcut(expected, buckets, duplicates='drop')
expected_percents = expected.groupby(breakpoints).count() / len(expected)
actual_percents = actual.groupby(breakpoints).count() / len(actual)
psi_values = (expected_percents - actual_percents) * \
np.log(expected_percents / actual_percents)
return psi_values.sum()
# 告警阈值
if psi_score > 0.25:
logger.warning(f" Feature drift! PSI={psi_score:.3f}, trigger retrain")
该方案具备清晰的纵向扩展路径:
# SHAP 解释性示例(满足监管要求)
import shap
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_realtime_sample)
# 输出:哪些特征"推动"了本次高风险判断
# 监管审查时可呈现:模型为何判定该交易为欺诈
shap.summary_plot(shap_values, X_realtime_sample, max_display=10)
运维参数参考:
| 指标 | 报警阈值 | 正常范围 |
|---|---|---|
| 模型响应 P99 | > 250ms | < 150ms |
| 特征 PSI 日升 | > 0.15 | < 0.05 |
| 误拒率 | > 1.2% | < 0.9% |
| 日欺诈检出量 | 波动 ±40% 周环比 | 波动 ±15% |