【大数据】Flink SQL 语法篇(六):Temporal Join
《Flink SQL 语法篇》系列,共包含以下 10 篇文章:
- Flink SQL 语法篇(一):CREATE
- Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
- Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
- Flink SQL 语法篇(四):Group 聚合、Over 聚合
- Flink SQL 语法篇(五):Regular Join、Interval Join
- Flink SQL 语法篇(六):Temporal Join
- Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
- Flink SQL 语法篇(八):集合、Order By、Limit、TopN
- Flink SQL 语法篇(九):Window TopN、Deduplication
- Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
Flink SQL 语法篇(六):Temporal Join
- 1.Versioned Table 的两种定义方式
- 1.1 PRIMARY KEY 定义方式
- 1.2 Deduplicate 定义方式
- 2.应用案例
- 2.1 案例一(事件时间)
- 2.2 案例二(处理时间)
Temporal Join 定义(支持 Batch / Streaming):Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 Join 这个 拉链快照表 的 Join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做 Versioned Table,使用一个明细表去 Join 这个 Versioned Table 的 Join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table 其实就是对同一条 key(在 DDL 中以 Primary Key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择 Versioned Table 对应时间区间内的快照数据进行 Join。
应用场景:比如常见的汇率数据(实时的根据汇率计算总金额),在 12 : 00 12:00 12:00 之前(事件时间),人民币和美元汇率是 7 : 1 7:1 7:1,在 12 : 00 12:00 12:00 之后变为 6 : 1 6:1 6:1,那么在 12 : 00 12:00 12:00 之前数据就要按照 7 : 1 7:1 7:1 进行计算, 12 : 00 12:00 12:00 之后就要按照 6 : 1 6:1 6:1 计算。在事件时间语义的任务中,事件时间 12 : 00 12:00 12:00 之前的数据,要按照 7 : 1 7:1 7:1 进行计算, 12 : 00 12:00 12:00 之后的数据,要按照 6 : 1 6:1 6:1 进行计算。这其实就是离线中快照的概念,维护具体汇率的表在 Flink SQL 体系中就叫做 Versioned Table。
1.Versioned Table 的两种定义方式
Verisoned Table:Verisoned Table 中存储的数据通常是来源于 CDC 或者会发生更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。举一个汇率场景的案例来看一下一个 Versioned Table 的两种定义方式。
1.1 PRIMARY KEY 定义方式
-- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到 CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS update_time, -- PRIMARY KEY 定义方式 PRIMARY KEY(currency) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'value.format' = 'debezium-json', /* ... */ );
1.2 Deduplicate 定义方式
-- 定义一个 append-only 的数据源表 CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS update_time ) WITH ( 'connector' = 'kafka', 'value.format' = 'debezium-json', /* ... */ ); -- 将数据源表按照 Deduplicate 方式定义为 Versioned Table CREATE VIEW versioned_rates AS SELECT currency, conversion_rate, update_time -- 1. 定义 `update_time` 为时间字段 FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency -- 2. 定义 `currency` 为主键 ORDER BY update_time DESC -- 3. ORDER BY 中必须是时间戳列 ) AS rownum FROM currency_rates) WHERE rownum = 1;
2.应用案例
Temporal Join 支持的时间语义:事件时间、处理时间。
2.1 案例一(事件时间)
-- 1. 定义一个输入订单表 CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); -- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到 CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS update_time, PRIMARY KEY(currency) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'value.format' = 'debezium-json', /* ... */ ); SELECT order_id, price, currency, conversion_rate, order_time, FROM orders -- 3. Temporal Join 逻辑 -- SQL 语法为:FOR SYSTEM_TIME AS OF LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time ON orders.currency = currency_rates.currency;
结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率:
order_id price 货币 汇率 order_time ======== ===== ======== =============== ========= o_001 11.11 EUR 1.14 12:00:00 o_002 12.51 EUR 1.10 12:06:00
- 事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark。
- 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中。
2.2 案例二(处理时间)
10:15> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 10:30> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 -- 10:42 时,Euro 的汇率从 114 变为 116 10:52> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 116