数仓分层设计架构:ODS-DWD-DWS-ADS

April . 01 . 2025

数仓分层设计架构:ODS-DWD-DWS-ADS 与实时 / T+1 双链路落地实践

1. 背景

数仓分层的核心价值,是把数据从“原始采集”逐步加工成“可复用、可分析、可服务业务”的数据资产。

经典数仓分层一般包括:

层级 名称 核心职责
ODS Operational Data Store,原始数据层 接入并保留原始业务数据
DWD Data Warehouse Detail,明细数据层 清洗、规范化、事实明细建模
DWS Data Warehouse Summary,汇总数据层 面向主题沉淀公共指标
ADS Application Data Service,应用数据层 面向报表、接口、看板提供数据服务

在实际生产中,数仓通常不会只有一条链路,而是同时存在两类数据链路:

  1. 实时数据链路:本文重点以用户下单风控为例,面向实时特征计算、风控 API 查询和在线决策。
  2. T+1 批量链路:面向日报、经营分析、财务对账、历史回补等场景。

本文结合 RisingWave、Flink CDC、StarRocks、MySQL、Elasticsearch、DolphinScheduler,介绍一套更贴近实战的数仓分层方案。


2. 总体架构

                 ┌─────────────────────────┐
                 │ MySQL / PostgreSQL / MQ │
                 └────────────┬────────────┘
                              │
                           Flink CDC
                              │
          ┌───────────────────┴───────────────────┐
          │                                       │
          v                                       v
   实时风控链路                              T+1 批量链路
   RisingWave                              StarRocks ODS
          │                                       │
          v                                 DolphinScheduler
 MySQL 风控特征库                                │
          │                                       v
          v                                批量 SQL / 脚本
   通用风控 API                                  │
          │                                       v
          v                                StarRocks DWD/DWS/ADS
   风控系统 / 规则引擎

可以简单理解为:

  • 实时链路解决“下单当下风险如何判断”。
  • T+1 链路解决“昨天最终结果是什么”。
  • RisingWave 负责实时 SQL 建模和实时物化视图。
  • MySQL 负责承接风控 API 点查所需的实时特征快照。
  • StarRocks 负责高性能 OLAP 查询、风控复盘和 T+1 应用层数据服务。
  • DolphinScheduler 负责任务编排、依赖调度、失败重试和补数。

3. ODS 层:原始数据层

3.1 职责

ODS 层负责接入业务系统的原始数据,尽量保留业务库原貌。

ODS 层主要职责:

  • 接入业务库 CDC 数据、日志数据、消息数据。
  • 保留原始字段和业务主键。
  • 保留 CDC 变更类型,例如 insert、update、delete。
  • 增加采集时间、同步时间、数据来源等技术字段。
  • 支持数据回放、问题追溯和历史重算。

3.2 示例

订单原始表进入 ODS 后,可以保留如下结构:

CREATE TABLE ods_trade_order (
    order_id BIGINT,
    user_id BIGINT,
    product_id BIGINT,
    order_amount DECIMAL(18,2),
    order_status VARCHAR(32),
    created_at DATETIME,
    updated_at DATETIME,
    op_type VARCHAR(16),
    sync_time DATETIME
);

3.3 最佳实践

ODS 层不建议写复杂业务逻辑,只做必要的标准化处理:

  • 统一字段类型。
  • 统一时间格式。
  • 增加数据来源标识。
  • 保留原始业务主键。
  • 保留变更操作类型。
  • 不提前过滤可能后续会使用的数据。

这样后续如果业务口径变化,可以从 ODS 重新加工。


4. DWD 层:明细数据层

4.1 职责

DWD 层是在 ODS 基础上完成清洗、关联和事实建模后的明细数据层。

主要职责:

  • 过滤无效数据。
  • 统一字段命名。
  • 统一业务状态枚举。
  • 统一金额、时间、地区等口径。
  • 关联用户、商品、门店、组织等维度。
  • 构建业务事实明细表。

DWD 层仍然保持明细粒度,不建议过早聚合。

4.2 示例

订单明细事实表:

CREATE TABLE dwd_trade_order_detail (
    order_id BIGINT,
    user_id BIGINT,
    product_id BIGINT,
    category_id BIGINT,
    order_amount DECIMAL(18,2),
    pay_amount DECIMAL(18,2),
    order_status VARCHAR(32),
    order_time DATETIME,
    pay_time DATETIME,
    province VARCHAR(64),
    city VARCHAR(64)
);

4.3 设计重点

DWD 层要重点解决“数据能不能被复用”的问题。

例如订单状态在业务库中可能是数字枚举:

0 = 待支付
1 = 已支付
2 = 已取消
3 = 已完成

在 DWD 层可以统一转换为清晰的业务状态:

CREATED
PAID
CANCELLED
FINISHED

这样 DWS 和 ADS 层在计算指标时就不用重复理解源系统枚举。


5. DWS 层:汇总数据层

5.1 职责

DWS 层面向业务主题沉淀公共指标,是指标复用的核心层。

常见主题包括:

  • 订单主题。
  • 用户主题。
  • 商品主题。
  • 支付主题。
  • 流量主题。
  • 门店主题。

DWS 层不是面向某一个具体报表,而是面向可复用的分析主题。

5.2 示例

城市每日订单汇总:

CREATE TABLE dws_trade_city_day (
    stat_date DATE,
    province VARCHAR(64),
    city VARCHAR(64),
    order_count BIGINT,
    pay_user_count BIGINT,
    gmv DECIMAL(18,2),
    pay_amount DECIMAL(18,2)
);

5.3 指标口径

DWS 层要重点保证指标口径统一,例如:

  • GMV 是否包含取消订单。
  • 支付金额是否扣除退款。
  • 用户数按下单用户还是支付用户计算。
  • 统计时间按下单时间、支付时间还是完成时间计算。
  • 是否包含测试订单、内部订单、异常订单。

如果 DWS 层没有统一口径,ADS 层和报表层很容易出现“同一个指标,不同报表结果不一致”的问题。


6. ADS 层:应用数据层

6.1 职责

ADS 层直接面向业务应用、BI 报表、实时大屏和 API 服务。

主要职责:

  • 面向具体业务场景组织数据。
  • 提供高性能查询。
  • 封装复杂指标。
  • 降低前端、BI 和业务系统的使用成本。

ADS 层通常不追求通用性,而是围绕具体应用场景做裁剪和组织。

6.2 示例

销售看板 ADS 表:

CREATE TABLE ads_sales_dashboard_day (
    stat_date DATE,
    total_gmv DECIMAL(18,2),
    total_pay_amount DECIMAL(18,2),
    total_order_count BIGINT,
    total_pay_user_count BIGINT,
    top_city VARCHAR(64),
    top_category VARCHAR(64),
    updated_at DATETIME
);

ADS 层可以根据具体查询场景设计:

  • 宽表。
  • 预聚合表。
  • 排行榜表。
  • 趋势表。
  • 明细检索表。
  • 看板汇总表。

7. 实战一:用户下单风控实时链路

7.1 业务场景

这里用“用户下单风控”作为实时链路示例。

典型业务流程如下:

  1. 订单、用户、设备、地址、支付、行为等业务表通过 CDC 实时同步。
  2. RisingWave 基于 CDC 数据持续计算用户、设备、地址等风控指标。
  3. 风控指标结果写入 MySQL 风控特征库。
  4. 用户在业务系统提交订单时,业务系统携带当前订单上下文调用通用风控 API。
  5. 风控 API 查询 MySQL 中的实时特征,并结合当前订单上下文返回决策结果。
  6. 风控系统结合规则引擎或模型结果,判断订单是否放行、拦截、人工审核或二次验证。
  7. 订单落库后再通过 CDC 更新后续实时特征。

链路设计为:

业务库订单表 / 用户表 / 设备表 / 地址表 / 支付表
    │
Flink CDC
    │
RisingWave
    │
实时风控指标计算
    │
MySQL 风控特征库
    │
通用风控 API
    │
风控系统 / 规则引擎 / 模型服务

用户提交订单
    │
业务系统携带当前订单上下文调用风控 API
    │
风控系统返回决策结果

这个链路的核心是把用户行为特征稳定、低延迟地提供给风控决策系统。


7.2 风控实时指标示例

下单风控常见指标包括:

  • 用户近 5 分钟下单次数。
  • 用户近 1 小时下单金额。
  • 用户近 24 小时取消订单次数。
  • 同一设备近 10 分钟下单用户数。
  • 同一收货地址近 1 小时下单次数。
  • 同一手机号近 24 小时绑定账号数。
  • 用户历史拒付、退款、投诉次数。
  • 当前订单金额是否显著高于用户历史均值。
  • 当前下单城市是否与常用城市偏离。

这些指标适合在 RisingWave 中用物化视图持续维护,然后把最新结果同步到 MySQL。

需要注意:下面的 SQL 主要用于表达指标口径。生产环境中要优先使用事件时间窗口、滑动窗口或固定刷新策略,避免把所有逻辑都写成依赖 NOW() 的大范围扫描。

7.3 实时链路分层设计

ODS:实时原始数据接入

通过 Flink CDC 采集业务库 Binlog。

MySQL Binlog -> Flink CDC -> RisingWave Source

ODS 层保留原始业务数据和 CDC 变更信息。风控场景中,建议至少同步以下表:

  • 订单表。
  • 用户表。
  • 支付流水表。
  • 设备绑定表。
  • 登录行为表。
  • 地址簿表。
  • 黑名单、灰名单、白名单表。

DWD:实时明细清洗

在 RisingWave 中使用 SQL 或物化视图完成实时清洗,形成订单风控明细宽表。

CREATE MATERIALIZED VIEW dwd_risk_order_detail AS
SELECT
    o.order_id,
    o.user_id,
    o.device_id,
    o.receiver_phone,
    o.receiver_address_id,
    o.product_id,
    o.order_amount,
    o.order_status,
    o.created_at AS order_time,
    u.register_time,
    u.user_level,
    u.province,
    u.city
FROM ods_trade_order o
LEFT JOIN ods_user u ON o.user_id = u.user_id
WHERE o.order_status IN ('CREATED', 'WAIT_PAY', 'PAID');

DWS:实时风控指标聚合

例如统计用户、设备、地址维度的实时下单特征:

CREATE MATERIALIZED VIEW dws_risk_user_order_feature AS
SELECT
    user_id,
    COUNT(*) FILTER (
        WHERE order_time >= NOW() - INTERVAL '5 minutes'
    ) AS order_cnt_5m,
    COUNT(*) FILTER (
        WHERE order_time >= NOW() - INTERVAL '1 hour'
    ) AS order_cnt_1h,
    SUM(order_amount) FILTER (
        WHERE order_time >= NOW() - INTERVAL '1 hour'
    ) AS order_amount_1h,
    MAX(order_time) AS last_order_time
FROM dwd_risk_order_detail
GROUP BY user_id;
CREATE MATERIALIZED VIEW dws_risk_device_order_feature AS
SELECT
    device_id,
    COUNT(*) FILTER (
        WHERE order_time >= NOW() - INTERVAL '10 minutes'
    ) AS order_cnt_10m,
    COUNT(DISTINCT user_id) FILTER (
        WHERE order_time >= NOW() - INTERVAL '10 minutes'
    ) AS user_cnt_10m,
    MAX(order_time) AS last_order_time
FROM dwd_risk_order_detail
WHERE device_id IS NOT NULL
GROUP BY device_id;
CREATE MATERIALIZED VIEW dws_risk_address_order_feature AS
SELECT
    receiver_address_id,
    COUNT(*) FILTER (
        WHERE order_time >= NOW() - INTERVAL '1 hour'
    ) AS order_cnt_1h,
    COUNT(DISTINCT user_id) FILTER (
        WHERE order_time >= NOW() - INTERVAL '1 hour'
    ) AS user_cnt_1h,
    SUM(order_amount) FILTER (
        WHERE order_time >= NOW() - INTERVAL '1 hour'
    ) AS order_amount_1h
FROM dwd_risk_order_detail
WHERE receiver_address_id IS NOT NULL
GROUP BY receiver_address_id;

ADS:风控特征服务表

ADS 层把多个维度的实时指标合并成一张风控特征快照表。

CREATE MATERIALIZED VIEW ads_risk_order_feature_snapshot AS
SELECT
    o.order_id,
    o.user_id,
    o.device_id,
    o.receiver_address_id,
    o.order_amount,
    o.order_time,
    uf.order_cnt_5m AS user_order_cnt_5m,
    uf.order_cnt_1h AS user_order_cnt_1h,
    uf.order_amount_1h AS user_order_amount_1h,
    df.order_cnt_10m AS device_order_cnt_10m,
    df.user_cnt_10m AS device_user_cnt_10m,
    af.order_cnt_1h AS address_order_cnt_1h,
    af.user_cnt_1h AS address_user_cnt_1h,
    af.order_amount_1h AS address_order_amount_1h,
    NOW() AS feature_updated_at
FROM dwd_risk_order_detail o
LEFT JOIN dws_risk_user_order_feature uf ON o.user_id = uf.user_id
LEFT JOIN dws_risk_device_order_feature df ON o.device_id = df.device_id
LEFT JOIN dws_risk_address_order_feature af ON o.receiver_address_id = af.receiver_address_id;

这张表的结果可以 sink 到 MySQL,由风控 API 按 order_iduser_id + order_id 查询。


8. RisingWave 到 MySQL:风控特征库

8.1 适用场景

RisingWave 到 MySQL 在这个场景中不是为了做分析查询,而是作为在线风控特征库。

链路示例:

RisingWave 风控物化视图 -> MySQL 风控特征表 -> 通用风控 API -> 风控系统

MySQL 表可以按订单维度保存风控特征快照:

CREATE TABLE risk_order_feature_snapshot (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT NOT NULL,
    device_id VARCHAR(128),
    receiver_address_id BIGINT,
    order_amount DECIMAL(18,2),
    user_order_cnt_5m BIGINT,
    user_order_cnt_1h BIGINT,
    user_order_amount_1h DECIMAL(18,2),
    device_order_cnt_10m BIGINT,
    device_user_cnt_10m BIGINT,
    address_order_cnt_1h BIGINT,
    address_user_cnt_1h BIGINT,
    address_order_amount_1h DECIMAL(18,2),
    feature_updated_at DATETIME,
    created_at DATETIME,
    updated_at DATETIME,
    KEY idx_user_id (user_id),
    KEY idx_feature_updated_at (feature_updated_at)
);

如果风控发生在订单落库前,此时可能还没有正式 order_id。可以先使用业务系统生成的 risk_request_id 或预生成订单号作为查询和审计主键,等订单正式落库后再补充 order_id

风控 API 查询示例:

GET /risk/features/order?orderId=10000001

返回示例:

{
  "orderId": 10000001,
  "userId": 8888,
  "userOrderCnt5m": 3,
  "userOrderCnt1h": 8,
  "userOrderAmount1h": 1699.00,
  "deviceOrderCnt10m": 12,
  "deviceUserCnt10m": 5,
  "addressOrderCnt1h": 6,
  "addressUserCnt1h": 4,
  "featureUpdatedAt": "2026-04-24 10:11:12"
}

8.2 这条链路的问题

这条链路可以落地,但需要注意几个关键问题。

第一,如果风控要在“订单提交前”拦截,仅依赖订单表 CDC 是不够的。CDC 通常发生在业务库提交之后,属于异步链路。也就是说,等订单 CDC 进入 RisingWave 再写入 MySQL,已经晚于用户提交订单的同步决策时刻。

第二,风控决策是强在线链路,MySQL 读写压力要严格控制。如果每次下单都触发大量特征更新,并且风控 API 又高并发读取同一张表,MySQL 容易成为瓶颈。

第三,CDC -> RisingWave -> MySQL 存在端到端延迟。用户刚下单时,订单 CDC 事件不一定已经完成计算并写入 MySQL。如果风控系统立即按订单号查询,可能查不到最新特征。

第四,风控需要的是“决策时刻的特征快照”。如果 API 查询到的是几秒后更新过的特征,可能和订单提交时刻不一致,影响规则解释和事后审计。

第五,MySQL 适合做点查,不适合承载复杂特征查询。复杂聚合、历史窗口计算、宽范围扫描都应该放在 RisingWave 或离线链路中完成。

8.3 优化方案

建议把这条链路拆成“同步决策链路”和“异步特征更新链路”。

同步决策链路:

用户提交订单
    │
业务系统携带当前订单上下文调用风控 API
    │
风控 API 查询 MySQL 中的用户 / 设备 / 地址历史实时特征
    │
规则引擎 / 模型服务
    │
返回放行 / 拦截 / 人审 / 二次验证
    │
业务系统根据决策结果决定是否创建或推进订单

异步特征更新链路:

业务库 CDC
    │
Flink CDC
    │
RisingWave 实时计算
    │
MySQL 风控特征快照表
    │
通用风控 API
    │
风控系统

也就是说,当前这笔订单的金额、商品、地址、设备、IP 等上下文应该由业务系统直接传给风控 API;RisingWave + MySQL 主要提供“截至当前时刻之前”的历史实时特征。订单 CDC 进入 RisingWave 后,再更新后续订单可使用的特征。

同时补充以下工程约束:

  • MySQL 只存风控 API 必须点查的结果,不存大宽表历史明细。
  • 风控 API 必须返回 feature_updated_at,风控系统根据特征新鲜度决定是否降级。
  • API 查询不到订单特征时,要有兜底策略,例如查询用户维度特征、使用默认规则、进入人工审核或走低风险放行策略。
  • 风控特征表按 order_iduser_iddevice_id 建索引,但主查询路径要控制在点查或小范围查询。
  • 特征快照要保留一段时间,便于风控审计和规则回放。
  • 对高频更新特征做聚合降频,例如按秒级或 5 秒级刷新,而不是每条事件都立即写 MySQL。
  • 对热点用户、热点设备可以增加缓存层,但缓存必须设置较短 TTL,并携带特征更新时间。

如果风控系统对延迟要求极高,例如要求 50ms 内完成决策,则不建议只依赖 MySQL 查询。可以把关键特征同步到 Redis 或专用在线特征库,MySQL 作为审计和兜底存储。


9. Elasticsearch 与 StarRocks 在风控链路中的位置

9.1 Elasticsearch

Elasticsearch 可以作为风控辅助检索系统,但不建议作为主风控决策特征库。

适合放入 ES 的数据:

  • 风控事件日志。
  • 拦截原因。
  • 用户风险画像。
  • 订单审核记录。
  • 设备、地址、手机号关联明细。

典型用途:

  • 风控运营后台检索。
  • 人工审核查询。
  • 风险事件排查。
  • 关联关系分析入口。

不建议让 ES 承担实时决策中的核心聚合计算。

9.2 StarRocks

StarRocks 更适合风控分析和复盘,不适合放在下单同步决策链路中。

适合放入 StarRocks 的数据:

  • 风控命中明细。
  • 规则命中结果。
  • 模型评分结果。
  • 人工审核结果。
  • 订单最终履约、退款、拒付结果。

典型用途:

  • 风控策略效果分析。
  • 规则命中率分析。
  • 拦截准确率分析。
  • 黑产设备、地址、手机号复盘。
  • T+1 风控报表。

推荐链路:

RisingWave / MySQL 风控结果 -> StarRocks -> 风控分析看板

10. 风控实时链路最佳实践

10.1 区分同步决策链路和异步分析链路

下单风控属于同步决策链路,必须控制延迟和依赖数量。

建议:

  • 同步链路只读取必要特征。
  • 复杂分析放到异步链路。
  • 风控 API 要设置超时时间。
  • API 超时后必须有明确降级策略。

10.2 特征必须有新鲜度

风控 API 返回特征时必须携带更新时间:

{
  "orderId": 10000001,
  "userOrderCnt5m": 3,
  "featureUpdatedAt": "2026-04-24 10:11:12"
}

风控系统可以根据特征新鲜度判断是否可用:

  • 5 秒内:正常使用。
  • 5 到 30 秒:谨慎使用,叠加保守规则。
  • 超过 30 秒:认为特征过期,触发降级策略。

10.3 保留决策快照

风控判断不能只看当前最新特征,还要保留当时决策使用的特征快照。

建议每次风控决策落库:

  • order_id。
  • user_id。
  • rule_version。
  • model_version。
  • feature_snapshot。
  • risk_score。
  • decision_result。
  • decision_reason。
  • decision_time。

这样后续才能解释为什么某个订单被拦截或放行。

10.4 避免 MySQL 承担大规模历史计算

MySQL 在这条链路中只适合点查和小范围查询。

不建议在 MySQL 中做:

  • 近 30 天大窗口聚合。
  • 多表复杂 Join。
  • 高并发模糊检索。
  • 大范围分页查询。

这些计算应该前置到 RisingWave、StarRocks 或离线 T+1 链路。

10.5 建立端到端监控

风控实时链路至少要监控:

  • CDC 延迟。
  • RisingWave 计算延迟。
  • Sink 到 MySQL 的写入延迟。
  • MySQL 表更新时间。
  • 风控 API P95 / P99 延迟。
  • API 查询空结果比例。
  • 特征过期比例。
  • 风控决策降级比例。

对于风控系统来说,链路是否“稳定新鲜”比单次是否“查得到”更重要。


11. 实时链路与 T+1 链路的口径关系

风控实时链路和 T+1 链路也需要对账。

实时链路提供的是下单当下的风险判断依据,T+1 链路提供的是次日复盘和策略优化依据。

例如:

  • 实时链路判断某订单是否疑似风险。
  • T+1 链路统计这些订单后续是否退款、拒付、投诉或人工审核确认风险。
  • 策略团队基于 T+1 结果调整实时规则和模型。

因此风控数据也建议形成闭环:

实时特征 -> 实时决策 -> 决策结果 -> 履约结果 -> T+1 复盘 -> 策略优化

这个闭环比单纯“实时算几个指标”更重要。


12. 实战二:T+1 批量数据链路

12.1 业务场景

T+1 链路主要服务对准确性、完整性要求更高的场景,例如:

  • 经营日报。
  • 财务报表。
  • 管理驾驶舱。
  • 历史趋势分析。
  • 数据对账。
  • 指标重算和补数。

本文中的 T+1 链路设计为:

业务库
    │
Flink CDC
    │
StarRocks ODS
    │
DolphinScheduler
    │
批量 SQL / 脚本
    │
StarRocks DWD / DWS / ADS

这里 Flink CDC 负责把业务库数据实时或准实时同步到 StarRocks 的 ODS 层,DolphinScheduler 负责编排每天的批量加工任务。


13. T+1 链路分层设计

13.1 ODS:Flink CDC 同步到 StarRocks

业务库通过 Flink CDC 同步到 StarRocks ODS 表。

MySQL Binlog -> Flink CDC -> StarRocks ODS

ODS 表尽量贴近源表结构。

CREATE TABLE ods_trade_order (
    order_id BIGINT,
    user_id BIGINT,
    product_id BIGINT,
    order_amount DECIMAL(18,2),
    order_status VARCHAR(32),
    created_at DATETIME,
    updated_at DATETIME,
    deleted_flag TINYINT,
    sync_time DATETIME
)
PRIMARY KEY(order_id)
DISTRIBUTED BY HASH(order_id) BUCKETS 32;

对于 CDC 同步,StarRocks 中建议根据业务主键设计 Primary Key 表,方便处理更新和删除。

13.2 DWD:批量清洗明细层

每天凌晨由 DolphinScheduler 调度 SQL 脚本,把前一天数据加工成 DWD 明细表。

INSERT OVERWRITE dwd_trade_order_detail PARTITION(p20260423)
SELECT
    o.order_id,
    o.user_id,
    o.product_id,
    p.category_id,
    o.order_amount,
    CASE
        WHEN o.order_status = 'PAID' THEN o.order_amount
        ELSE 0
    END AS pay_amount,
    o.order_status,
    o.created_at AS order_time,
    u.province,
    u.city
FROM ods_trade_order o
LEFT JOIN ods_user u ON o.user_id = u.user_id
LEFT JOIN ods_product p ON o.product_id = p.product_id
WHERE o.created_at >= '2026-04-23 00:00:00'
  AND o.created_at <  '2026-04-24 00:00:00'
  AND o.deleted_flag = 0;

DWD 层重点保证明细数据干净、稳定、可追溯。

13.3 DWS:批量汇总公共指标

DWS 层按主题汇总公共指标。

INSERT OVERWRITE dws_trade_city_day PARTITION(p20260423)
SELECT
    DATE(order_time) AS stat_date,
    province,
    city,
    COUNT(*) AS order_count,
    COUNT(DISTINCT user_id) AS pay_user_count,
    SUM(order_amount) AS gmv,
    SUM(pay_amount) AS pay_amount
FROM dwd_trade_order_detail
WHERE order_time >= '2026-04-23 00:00:00'
  AND order_time <  '2026-04-24 00:00:00'
GROUP BY
    DATE(order_time),
    province,
    city;

DWS 层产出的数据可以被多个 ADS 场景复用。

13.4 ADS:面向报表和应用

ADS 层根据具体业务看板组织数据。

INSERT OVERWRITE ads_sales_daily_report PARTITION(p20260423)
SELECT
    stat_date,
    SUM(gmv) AS total_gmv,
    SUM(pay_amount) AS total_pay_amount,
    SUM(order_count) AS total_order_count,
    SUM(pay_user_count) AS total_pay_user_count
FROM dws_trade_city_day
WHERE stat_date = '2026-04-23'
GROUP BY stat_date;

ADS 层面向消费端设计,核心目标是查询简单、性能稳定、口径清晰。


14. DolphinScheduler 调度设计

T+1 链路可以拆成多个任务节点:

1. ods_check
2. dwd_trade_order_detail
3. dws_trade_city_day
4. ads_sales_daily_report
5. data_quality_check
6. report_notify

任务依赖关系:

ods_check
   │
   v
dwd_trade_order_detail
   │
   v
dws_trade_city_day
   │
   v
ads_sales_daily_report
   │
   v
data_quality_check
   │
   v
report_notify

14.1 调度建议

建议:

  • 每天凌晨业务低峰期执行。
  • 先检查 ODS 数据是否同步完成。
  • DWD、DWS、ADS 分层拆任务。
  • 每层任务支持重跑。
  • 每个任务使用业务日期参数。
  • 失败后告警到企业微信、飞书或邮件。
  • 核心任务设置超时时间和失败重试次数。

日期参数示例:

biz_date = ${system.biz.date}
start_time = ${biz_date} 00:00:00
end_time = ${biz_date + 1} 00:00:00

14.2 任务拆分原则

不要把所有 SQL 都写在一个大脚本里。

推荐按以下粒度拆分:

  • 一个业务主题一个工作流。
  • 一个数仓层级一个或多个任务。
  • 一个核心产出表一个任务。
  • 数据质量校验单独成任务。
  • 通知和下游触发单独成任务。

这样方便失败定位、任务重跑和血缘分析。


15. 批量脚本最佳实践

15.1 所有脚本必须可重跑

T+1 任务一定要支持幂等重跑。

推荐方式:

INSERT OVERWRITE table_name PARTITION(partition_name)
SELECT ...

或者:

DELETE FROM table_name WHERE stat_date = '${biz_date}';

INSERT INTO table_name
SELECT ...

不建议直接无条件 INSERT INTO,否则重跑会产生重复数据。

15.2 按业务日期分区

StarRocks 中建议按日期分区:

PARTITION BY date_trunc('day', stat_date)

好处:

  • 查询裁剪分区。
  • 重跑某一天数据成本低。
  • 方便生命周期管理。
  • 方便历史补数。
  • 降低全表扫描风险。

15.3 ODS 到 DWD 使用明确口径

T+1 报表通常更关注“截至昨日结束时的最终状态”,因此需要明确使用哪种统计口径:

  • 创建时间口径。
  • 支付时间口径。
  • 完成时间口径。
  • 更新时间口径。
  • 快照日期口径。

不同口径不能混用。

例如订单 GMV 如果按支付时间统计,就不应该在同一个指标中混入创建时间过滤。

15.4 建立数据质量校验

每个核心任务完成后建议做数据质量检查:

  • 总行数是否异常。
  • 金额是否为负。
  • 主键是否重复。
  • 核心字段是否为空。
  • 与昨日环比是否波动过大。
  • ODS 与 DWD 行数是否大幅不一致。
  • DWS 与 ADS 指标是否能对齐。

示例:

SELECT COUNT(*)
FROM dwd_trade_order_detail
WHERE stat_date = '${biz_date}';

SELECT COUNT(*)
FROM dwd_trade_order_detail
WHERE stat_date = '${biz_date}'
  AND order_id IS NULL;

质量校验不通过时,不应继续产出 ADS 报表。


16. 实时链路与 T+1 链路的关系

实时链路和 T+1 链路不是互相替代,而是互相补充。

对比项 实时链路 T+1 链路
目标 低延迟 高准确性
典型场景 下单风控、实时特征、在线决策 日报、财务、经营分析、风控复盘
计算方式 流式计算 批量计算
核心组件 RisingWave、Flink CDC DolphinScheduler、StarRocks
数据修正 持续更新 批量重算
查询服务 MySQL 风控特征库、通用风控 API StarRocks
关注点 延迟、特征新鲜度、API 稳定性、降级策略 幂等、补数、质量校验、策略复盘

一个成熟的数据平台通常会采用类似 Lambda 架构的思想:

实时链路:提供秒级实时特征和在线决策依据。
T+1 链路:提供最终确认后的权威数据和策略复盘依据。

在业务展示上可以明确区分:

  • 今日风控实时特征:来自实时链路。
  • 昨日及历史数据:来自 T+1 链路。
  • 风控策略效果、财务和结算类指标:优先使用 T+1 链路。

17. StarRocks 建模建议

17.1 ODS 层

ODS 层建议使用 Primary Key 表承接 CDC。

适合场景:

  • 需要处理 update/delete。
  • 按主键查询。
  • 保留最新业务状态。

示例:

PRIMARY KEY(order_id)
DISTRIBUTED BY HASH(order_id)

17.2 DWD 层

DWD 层可以使用 Duplicate Key 或 Primary Key,取决于是否需要更新。

建议:

  • 明细追加型事实表使用 Duplicate Key。
  • 状态会变化的事实表使用 Primary Key。
  • 大表按日期分区。
  • 常用过滤字段进入排序键或分桶设计。

17.3 DWS / ADS 层

DWS 和 ADS 层可以根据查询方式选择:

  • Aggregate Key:适合固定聚合指标。
  • Duplicate Key:适合灵活查询。
  • Primary Key:适合频繁更新的结果表。

常见优化:

  • 按日期分区。
  • 按高频过滤字段分桶。
  • 排序键贴近查询条件。
  • 避免 ADS 表过度宽泛。
  • 热数据保留更细粒度,冷数据做汇总归档。

18. 行业内最佳实践

18.1 指标口径统一管理

建议建立指标字典,统一维护:

  • 指标名称。
  • 指标英文名。
  • 计算逻辑。
  • 时间口径。
  • 过滤条件。
  • 数据来源。
  • 负责人。
  • 是否实时指标。
  • 是否财务确认指标。

否则不同报表会出现“同一个 GMV,多个结果”的问题。

18.2 分层命名规范

推荐命名方式:

ods_业务域_表名
dwd_业务域_事实表
dws_业务域_主题_周期
ads_应用场景_报表名

例如:

ods_trade_order
dwd_trade_order_detail
dws_trade_city_day
ads_dashboard_sales_day

18.3 数据任务必须可观测

生产环境中要监控:

  • 任务是否成功。
  • 数据是否产出。
  • 数据量是否异常。
  • 延迟是否异常。
  • 下游是否消费成功。
  • 查询是否变慢。
  • 数据质量是否达标。

实时链路看延迟,批量链路看产出和质量。

18.4 建立补数机制

数据平台一定会遇到补数场景,例如:

  • 源系统修复历史数据。
  • CDC 中断。
  • 业务口径调整。
  • 脚本逻辑修复。
  • 历史报表重算。

因此所有 T+1 脚本都应该支持传入业务日期:

run.sh 2026-04-23

不要把日期写死在脚本中。

18.5 实时决策与离线结果要对账

实时链路产出的风控决策结果,到了次日应该和 T+1 链路中的履约、退款、拒付、投诉、人工审核结果做对账。

例如:

实时链路在 2026-04-23 拦截的高风险订单
vs
T+1 链路中这些订单的实际风险确认结果

如果差异超过阈值,需要告警并分析原因。

常见原因包括:

  • CDC 或 Sink 延迟。
  • 特征计算窗口不合理。
  • 规则版本和模型版本不一致。
  • 风控 API 查询到过期特征。
  • 履约、退款、拒付结果晚到。
  • CDC 丢失。
  • 实时特征口径和 T+1 复盘口径不同。
  • 用户、设备、地址等维表更新不一致。

19. 总结

ODS-DWD-DWS-ADS 分层解决的是数据资产组织问题,而 RisingWave、Flink CDC、StarRocks、DolphinScheduler 解决的是工程落地问题。

一套实用的数据架构可以这样划分:

实时链路:
Flink CDC -> RisingWave -> MySQL 风控特征库 -> 通用风控 API -> 风控系统

T+1 链路:
Flink CDC -> StarRocks ODS -> DolphinScheduler -> StarRocks DWD/DWS/ADS

实时链路关注低延迟、特征新鲜度和 API 稳定性,适合用户下单风控这类在线决策场景。

T+1 链路关注准确性和可重跑,适合日报、经营分析、财务统计、风控复盘和历史补数。

最终,实时链路提供“当前这笔订单风险如何判断”,T+1 链路提供“这批决策最终效果如何”。两条链路结合,才能同时满足业务对实时性、准确性和可追溯性的要求。


20. 参考资料