【大数据】Flink SQL 语法篇(六):Temporal Join

03-01 1357阅读

《Flink SQL 语法篇》系列,共包含以下 10 篇文章:

【大数据】Flink SQL 语法篇(六):Temporal Join
(图片来源网络,侵删)
  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

    😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

    Flink SQL 语法篇(六):Temporal Join

    • 1.Versioned Table 的两种定义方式
      • 1.1 PRIMARY KEY 定义方式
      • 1.2 Deduplicate 定义方式
      • 2.应用案例
        • 2.1 案例一(事件时间)
        • 2.2 案例二(处理时间)

          Temporal Join 定义(支持 Batch / Streaming):Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 Join 这个 拉链快照表 的 Join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做 Versioned Table,使用一个明细表去 Join 这个 Versioned Table 的 Join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table 其实就是对同一条 key(在 DDL 中以 Primary Key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择 Versioned Table 对应时间区间内的快照数据进行 Join。

          应用场景:比如常见的汇率数据(实时的根据汇率计算总金额),在 12 : 00 12:00 12:00 之前(事件时间),人民币和美元汇率是 7 : 1 7:1 7:1,在 12 : 00 12:00 12:00 之后变为 6 : 1 6:1 6:1,那么在 12 : 00 12:00 12:00 之前数据就要按照 7 : 1 7:1 7:1 进行计算, 12 : 00 12:00 12:00 之后就要按照 6 : 1 6:1 6:1 计算。在事件时间语义的任务中,事件时间 12 : 00 12:00 12:00 之前的数据,要按照 7 : 1 7:1 7:1 进行计算, 12 : 00 12:00 12:00 之后的数据,要按照 6 : 1 6:1 6:1 进行计算。这其实就是离线中快照的概念,维护具体汇率的表在 Flink SQL 体系中就叫做 Versioned Table。

          1.Versioned Table 的两种定义方式

          Verisoned Table:Verisoned Table 中存储的数据通常是来源于 CDC 或者会发生更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。举一个汇率场景的案例来看一下一个 Versioned Table 的两种定义方式。

          1.1 PRIMARY KEY 定义方式

          -- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
          CREATE TABLE currency_rates (
              currency STRING,
              conversion_rate DECIMAL(32, 2),
              update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
              WATERMARK FOR update_time AS update_time,
              -- PRIMARY KEY 定义方式
              PRIMARY KEY(currency) NOT ENFORCED
          ) WITH (
             'connector' = 'kafka',
             'value.format' = 'debezium-json',
             /* ... */
          );
          

          1.2 Deduplicate 定义方式

          -- 定义一个 append-only 的数据源表
          CREATE TABLE currency_rates (
              currency STRING,
              conversion_rate DECIMAL(32, 2),
              update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
              WATERMARK FOR update_time AS update_time
          ) WITH (
              'connector' = 'kafka',
              'value.format' = 'debezium-json',
              /* ... */
          );
          -- 将数据源表按照 Deduplicate 方式定义为 Versioned Table
          CREATE VIEW versioned_rates AS
          SELECT currency, conversion_rate, update_time   -- 1. 定义 `update_time` 为时间字段
            FROM (
                SELECT *,
                ROW_NUMBER() OVER (PARTITION BY currency  -- 2. 定义 `currency` 为主键
                   ORDER BY update_time DESC              -- 3. ORDER BY 中必须是时间戳列
                ) AS rownum 
                FROM currency_rates)
          WHERE rownum = 1; 
          

          2.应用案例

          Temporal Join 支持的时间语义:事件时间、处理时间。

          2.1 案例一(事件时间)

          -- 1. 定义一个输入订单表
          CREATE TABLE orders (
              order_id    STRING,
              price       DECIMAL(32,2),
              currency    STRING,
              order_time  TIMESTAMP(3),
              WATERMARK FOR order_time AS order_time
          ) WITH (/* ... */);
          -- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
          CREATE TABLE currency_rates (
              currency STRING,
              conversion_rate DECIMAL(32, 2),
              update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
              WATERMARK FOR update_time AS update_time,
              PRIMARY KEY(currency) NOT ENFORCED
          ) WITH (
             'connector' = 'kafka',
             'value.format' = 'debezium-json',
             /* ... */
          );
          SELECT 
               order_id,
               price,
               currency,
               conversion_rate,
               order_time,
          FROM orders
          -- 3. Temporal Join 逻辑
          -- SQL 语法为:FOR SYSTEM_TIME AS OF
          LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
          ON orders.currency = currency_rates.currency;
          

          结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率:

          order_id  price  货币       汇率             order_time
          ========  =====  ========  ===============  =========
          o_001     11.11  EUR       1.14             12:00:00
          o_002     12.51  EUR       1.10             12:06:00
          
          • 事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark。
          • 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中。

            2.2 案例二(处理时间)

            10:15> SELECT * FROM LatestRates;
            currency   rate
            ======== ======
            US Dollar   102
            Euro        114
            Yen           1
            10:30> SELECT * FROM LatestRates;
            currency   rate
            ======== ======
            US Dollar   102
            Euro        114
            Yen           1
            -- 10:42 时,Euro 的汇率从 114 变为 116
            10:52> SELECT * FROM LatestRates;
            currency   rate
            ======== ======
            US Dollar   102
            Euro        116     
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]