Flink SQL实践

07-03 1283阅读

环境准备

方式1:基于Standalone Flink集群的SQL Client

启动Flink集群

[hadoop@node2 ~]$ start-cluster.sh
[hadoop@node2 ~]$ sql-client.sh
... 
省略若干日志输出
 ... 
Flink SQL>  

方式2:基于Yarn Session Flink集群的SQL Client

启动hadoop集群

[hadoop@node2 ~]$ myhadoop.sh start

使用Yarn Session启动Flink集群

[hadoop@node2 ~]$ yarn-session.sh -d

启动一个基于yarn-session的sql-client

[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
...
省略若干日志输出
...
Flink SQL> 
​

看到“Flink SQL>”提示符,说明成功开启了Flink的SQL客户端,此时就可以进行SQL相关操作了。

注意:以上选择其中一种方式进行后续操作。

数据库操作

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set
​
Flink SQL> create database mydatabase;
[INFO] Execute statement succeed.
​
Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
|       mydatabase |
+------------------+
2 rows in set
​
Flink SQL> show current database;
+-----------------------+
| current database name |
+-----------------------+
|      default_database |
+-----------------------+
1 row in set
​
切换当前数据库
Flink SQL> use mydatabase;
[INFO] Execute statement succeed.
​
Flink SQL> show current database;
+-----------------------+
| current database name |
+-----------------------+
|            mydatabase |
+-----------------------+
1 row in set
​
Flink SQL> quit;
...
...
...
[hadoop@node2 ~]$ 
​

表DDL操作

创建表

CREATE TABLE方式

创建test表

CREATE TABLE test(
    id INT, 
    ts BIGINT, 
    vc INT
) WITH (
'connector' = 'print'
);
LIKE方式

基于test表创建test1,并添加value字段

CREATE TABLE test1 (
    `value` STRING
)
LIKE test;

查看表 

show tables;

 查看test表结构

desc test;

查看test1表结构

desc test1;

操作过程

Flink SQL> CREATE TABLE test(
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH (
> 'connector' = 'print'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE test1 (
>     `value` STRING
> )
> LIKE test;
[INFO] Execute statement succeed.
​
Flink SQL> show tables;
+------------+
| table name |
+------------+
|       test |
|      test1 |
+------------+
2 rows in set
​
Flink SQL> desc test;
+------+--------+------+-----+--------+-----------+
| name |   type | null | key | extras | watermark |
+------+--------+------+-----+--------+-----------+
|   id |    INT | TRUE |     |        |           |
|   ts | BIGINT | TRUE |     |        |           |
|   vc |    INT | TRUE |     |        |           |
+------+--------+------+-----+--------+-----------+
3 rows in set
​
​
Flink SQL> desc test1;
+-------+--------+------+-----+--------+-----------+
|  name |   type | null | key | extras | watermark |
+-------+--------+------+-----+--------+-----------+
|    id |    INT | TRUE |     |        |           |
|    ts | BIGINT | TRUE |     |        |           |
|    vc |    INT | TRUE |     |        |           |
| value | STRING | TRUE |     |        |           |
+-------+--------+------+-----+--------+-----------+
4 rows in set
​
CTAS方式

CTAS:CREATE TABLE AS SELECT

create table test2 as select id, ts from test;

但这种方式不支持是print的连接器。因为print只能当作sink,不能当作source。

Flink SQL> create table test2 as select id, ts from test;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'print' can only be used as a sink. It cannot be used as a source.

修改表

修改表名
alter table test1 rename to test11;

操作过程

Flink SQL> alter table test1 rename to test11;
[INFO] Execute statement succeed.
​
Flink SQL> show tables;
+------------+
| table name |
+------------+
|       test |
|     test11 |
+------------+
2 rows in set
​
添加表字段

创建test2表

CREATE TABLE test2(
    id INT, 
    ts BIGINT, 
    vc INT
) WITH (
'connector' = 'print'
);

查看test2表结构 

desc test2;

添加表字段,并放在第一个字段

ALTER TABLE test2 ADD `status` INT COMMENT 'status descriptor' FIRST;

 查看test2表结构

desc test2;

操作过程

Flink SQL> CREATE TABLE test2(
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH (
> 'connector' = 'print'
> );
[INFO] Execute statement succeed.
​
Flink SQL> desc test2;
+------+--------+------+-----+--------+-----------+
| name |   type | null | key | extras | watermark |
+------+--------+------+-----+--------+-----------+
|   id |    INT | TRUE |     |        |           |
|   ts | BIGINT | TRUE |     |        |           |
|   vc |    INT | TRUE |     |        |           |
+------+--------+------+-----+--------+-----------+
3 rows in set
​
Flink SQL> ALTER TABLE test2 ADD `status` INT COMMENT 'status descriptor' FIRST;
[INFO] Execute statement succeed.
​
Flink SQL> desc test2;
+--------+--------+------+-----+--------+-----------+-------------------+
|   name |   type | null | key | extras | watermark |           comment |
+--------+--------+------+-----+--------+-----------+-------------------+
| status |    INT | TRUE |     |        |           | status descriptor |
|     id |    INT | TRUE |     |        |           |                   |
|     ts | BIGINT | TRUE |     |        |           |                   |
|     vc |    INT | TRUE |     |        |           |                   |
+--------+--------+------+-----+--------+-----------+-------------------+
4 rows in set
修改表字段

修改表字段

ALTER TABLE test2 MODIFY (vc DOUBLE NOT NULL, status STRING COMMENT 'status desc');

查看表结构

desc test2;

操作过程

Flink SQL> ALTER TABLE test2 MODIFY (vc DOUBLE NOT NULL, status STRING COMMENT 'status desc');
[INFO] Execute statement succeed.
​
Flink SQL> desc test2;
+--------+--------+-------+-----+--------+-----------+-------------+
|   name |   type |  null | key | extras | watermark |     comment |
+--------+--------+-------+-----+--------+-----------+-------------+
| status | STRING |  TRUE |     |        |           | status desc |
|     id |    INT |  TRUE |     |        |           |             |
|     ts | BIGINT |  TRUE |     |        |           |             |
|     vc | DOUBLE | FALSE |     |        |           |             |
+--------+--------+-------+-----+--------+-----------+-------------+
4 rows in set
删除表字段

删除表字段

ALTER TABLE test2 DROP (ts, status);

查看表结构

desc test2;

操作过程

Flink SQL> ALTER TABLE test2 DROP (ts, status);
[INFO] Execute statement succeed.
​
Flink SQL> desc test2;
+------+--------+-------+-----+--------+-----------+
| name |   type |  null | key | extras | watermark |
+------+--------+-------+-----+--------+-----------+
|   id |    INT |  TRUE |     |        |           |
|   vc | DOUBLE | FALSE |     |        |           |
+------+--------+-------+-----+--------+-----------+
2 rows in set

删除表

语法

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

案例

drop table if exists test2;

操作过程

Flink SQL> drop table if exists test2;
[INFO] Execute statement succeed.

表DML查询操作

Select

select
SELECT测试及结果显示模式设置
SELECT 'Hello World', 'It''s me';

注意:SELECT后面的字符串必须用单引号括起来,如果字符串里面包含有单引号,则再多用一个单引号(如:'It's me'写成'It''s me')。

结果如下:

Flink SQL实践

按q键返回命令行。

设置结果显示模式

可以看到,结果显示模式默认table,还可以设置为tableau、changelog。

  • 结果显示模式设置为tableau
    SET sql-client.execution.result-mode=tableau;

    操作过程

    Flink SQL> SET sql-client.execution.result-mode=tableau;
    [INFO] Execute statement succeed.
    ​
    Flink SQL> SELECT 'Hello World', 'It''s me';
    ...
    省略若干日志输出
    ...
    ​
    +----+--------------------------------+--------------------------------+
    | op |                         EXPR$0 |                         EXPR$1 |
    +----+--------------------------------+--------------------------------+
    | +I |                    Hello World |                        It's me |
    +----+--------------------------------+--------------------------------+
    Received a total of 1 row
    ​

     效果如下

    Flink SQL实践

    • 显示模式设置为changelog 
      SET sql-client.execution.result-mode=changelog;

      操作过程

      Flink SQL> SET sql-client.execution.result-mode=changelog;
      [INFO] Execute statement succeed.
      Flink SQL> SELECT 'Hello World', 'It''s me';
      ​...
      省略若干日志输出
      ...

      显示结果如下:

      Flink SQL实践

      根据个人喜好,设置其中一种结果显示模式。

      Source表

      通过数据生成器创建source表

      CREATE TABLE source ( 
          id INT, 
          ts BIGINT, 
          vc INT
      ) WITH ( 
          'connector' = 'datagen', 
          'rows-per-second'='1', 
          'fields.id.kind'='random', 
          'fields.id.min'='1', 
          'fields.id.max'='10', 
          'fields.ts.kind'='sequence', 
          'fields.ts.start'='1', 
          'fields.ts.end'='1000000', 
          'fields.vc.kind'='random', 
          'fields.vc.min'='1', 
          'fields.vc.max'='100'
      );

      查询source表数据

      select * from source;

      查询结果

      Flink SQL实践

      按住ctrl + c 结束查询。

      SELECT id, vc + 10 FROM source;

       执行效果如下

      Flink SQL实践

      Sink表

      创建sink表

      CREATE TABLE sink (
          id INT, 
          ts BIGINT, 
          vc INT
      ) WITH (
      'connector' = 'print'
      );

      查询source表数据插入sink表

      INSERT INTO sink select  * from source;

      直接查询sink表数据,报错如下:

      Flink SQL> select * from sink;
      [ERROR] Could not execute SQL statement. Reason:
      org.apache.flink.table.api.ValidationException: Connector 'print' can only be used as a sink. It cannot be used as a source.
      ​

      正确查询方式,通过8088进入Application Master进入Web UI,看到一个Running Job

      Flink SQL实践

      通过这个Runnig Job的Task Manager查看结果

      Flink SQL实践

      取消作业

      Flink SQL实践

       select where
      SELECT id FROM source WHERE id >5;

      Flink SQL实践

      With子句

      WITH提供了一种编写辅助语句的方法,以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE),可以认为它们定义了仅为一个查询而存在的临时视图。

      WITH source_with_total AS (
          SELECT id, vc+10 AS total
          FROM source
      )
      SELECT id, SUM(total)
      FROM source_with_total
      GROUP BY id;

       执行效果如下

      Flink SQL实践

      分组聚合

      SELECT vc, COUNT(*) as cnt FROM source GROUP BY vc;

      Flink SQL实践

      -U是撤回流

      创建source1表

      CREATE TABLE source1 (
      dim STRING,
      user_id BIGINT,
      price BIGINT,
      row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
      WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
      ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.dim.length' = '1',
      'fields.user_id.min' = '1',
      'fields.user_id.max' = '100000',
      'fields.price.min' = '1',
      'fields.price.max' = '100000'
      );

      创建sink1表

      CREATE TABLE sink1 (
      dim STRING,
      pv BIGINT,
      sum_price BIGINT,
      max_price BIGINT,
      min_price BIGINT,
      uv BIGINT,
      window_start bigint
      ) WITH (
      'connector' = 'print'
      );

      查询对source1表进行分组聚合并插入到sink1表中

      insert into sink1
      select dim,
      count(*) as pv,
      sum(price) as sum_price,
      max(price) as max_price,
      min(price) as min_price,
      -- 计算 uv 数
      count(distinct user_id) as uv,
      cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
      from source1
      group by
      dim,
      -- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min, 
      cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint);

      查看结果

      Flink SQL实践

      在Web UI中取消作业。

      多维分析

      Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets:

      SELECT
        supplier_id
      , rating
      , product_id
      , COUNT(*)
      FROM (
      VALUES
        ('supplier1', 'product1', 4),
        ('supplier1', 'product2', 3),
        ('supplier2', 'product3', 3),
        ('supplier2', 'product4', 4)
      )
      -- 供应商id、产品id、评级
      AS Products(supplier_id, product_id, rating)  
      GROUP BY GROUPING SETS(
        (supplier_id, product_id, rating),
        (supplier_id, product_id),
        (supplier_id, rating),
        (supplier_id),
        (product_id, rating),
        (product_id),
        (rating),
        ()
      );

      运行结果 

      Flink SQL实践

      分组窗口聚合

      准备数据

      CREATE TABLE ws (
        id INT,
        vc INT,
        pt AS PROCTIME(), --处理时间
        et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
        WATERMARK FOR et AS et - INTERVAL '5' SECOND   --watermark
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.id.min' = '1',
        'fields.id.max' = '3',
        'fields.vc.min' = '1',
        'fields.vc.max' = '100'
      );
      滚动窗口

      滚动窗口(时间属性字段,窗口长度)

      select  
      id,
      TUMBLE_START(et, INTERVAL '5' SECOND)  wstart,
      TUMBLE_END(et, INTERVAL '5' SECOND)  wend,
      sum(vc) sumVc
      from ws
      group by id, TUMBLE(et, INTERVAL '5' SECOND);

      Flink SQL实践

      观察结果,可以看到按id分组进行统计,窗口长度(wend-wstart)为5秒,按Q退出查询。

      滑动窗口

      滑动窗口(时间属性字段,滑动步长,窗口长度)

      select  
      id,
      HOP_START(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND)   wstart,
      HOP_END(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND)  wend,
      sum(vc) sumVc
      from ws
      group by id, HOP(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND);

      Flink SQL实践

      从结果中看到,窗口长度是5秒,同一id与上一个窗口滑动的步长为3秒。

      会话窗口

      会话窗口(时间属性字段,会话间隔)

      select  
      id,
      SESSION_START(et, INTERVAL '5' SECOND)  wstart,
      SESSION_END(et, INTERVAL '5' SECOND)  wend,
      sum(vc) sumVc
      from ws
      group by id, SESSION(et, INTERVAL '5' SECOND);

      Flink SQL实践

      因为数据源源不断生成,所以不满足5s没有数据的会话间隔。

      注意:分组窗口基本被更加强大的TVF窗口替代。

      窗口表值函数(TVF)聚合

      对比分组窗口(GroupWindow),TVF窗口更有效和强大。包括:

      • 提供更多的性能优化手段

      • 支持GroupingSets语法

      • 可以在window聚合中使用TopN

      • 提供累积窗口

        对于窗口表值函数,窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段window_start和window_end

        FROM TABLE(
        窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL时间…)
        )
        GROUP BY [window_start,][window_end,] --可选
        滚动窗口
        SELECT 
        window_start, 
        window_end, 
        id , SUM(vc) 
        sumVC
        FROM TABLE(
          TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
        GROUP BY window_start, window_end, id;

        Flink SQL实践

        从结果来看,第一个id为2的窗口时间范围是[35,40),第二个id为2的窗口时间范围是[40,45),正是长度为5秒的滚动窗口。

        滑动窗口

        要求: 窗口长度=滑动步长的整数倍(底层会优化成多个小滚动窗口)

        SELECT window_start, window_end, id , SUM(vc) sumVC
        FROM TABLE(
          HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS))
        GROUP BY window_start, window_end, id;

        Flink SQL实践

        观察相同id的窗口数据,例如:id为2,时间范围[55,05),[00,10),...  

        数据符合窗口长度为10秒、滑动步长为5秒的滑动窗口。

        累积窗口

        Flink SQL实践

        累积窗口会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。

        注意: 窗口最大长度 = 累积步长的整数倍

        SELECT 
        window_start, 
        window_end, 
        id , 
        SUM(vc) sumVC
        FROM TABLE(
          CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL '2' SECONDS , INTERVAL '6' SECONDS))
        GROUP BY window_start, window_end, id;

        Flink SQL实践

        观察结果,id为1的窗口时间数据:[36,38),[36,40),[36,42),[42,44),...  

        符合累计窗口的特点。

        多维分析
        SELECT 
        window_start, 
        window_end, 
        id , 
        SUM(vc) sumVC
        FROM TABLE(
          TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
        GROUP BY window_start, window_end,
        rollup( (id) )
        --  cube( (id) )
        --  grouping sets( (id),()  )
        ;

        Flink SQL实践

        rollup在多维分析中是“上卷”的意思,即将数据按某种指定的粒度进行进一步聚合,获得更粗粒度的聚合数据。

        从以上结果中,截取[00,05)的数据

        Flink SQL实践

        可以看到基于id汇总,id=1  聚合值为860,id=2  聚合值为907,id=3  聚合值为727,上卷为更粗粒度(不区分id了,id在这里为NULL)的聚合数据得到2494(860+907+727=2494)。

        Over 聚合

        OVER聚合为一系列有序行的每个输入行计算一个聚合值。与GROUP BY聚合相比,OVER聚合不会将每个组的结果行数减少为一行。相反,OVER聚合为每个输入行生成一个聚合值。 可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。

        语法

        SELECT
          agg_func(agg_col) OVER (
            [PARTITION BY col1[, col2, ...]]
            ORDER BY time_col
            range_definition),
          ...
        FROM ...

        ORDER BY:必须是时间戳列,只能升序

        range_definition:标识聚合窗口的聚合数据范围,有两种指定数据范围的方式,1.按照行数聚合,2.按照时间区间聚合

        案例

        按照时间区间聚合 统计每个传感器前10秒到现在收到的水位数据(vc)条数。

        SELECT 
            id, 
            et, 
            vc,
            count(vc) OVER (
                PARTITION BY id
                ORDER BY et
                RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
          ) AS cnt
        FROM ws;

        Flink SQL实践

        也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口,便于重复使用:

        SELECT 
            id, 
            et, 
            vc,
        count(vc) OVER w AS cnt,
        sum(vc) OVER w AS sumVC
        FROM ws
        WINDOW w AS (
            PARTITION BY id
            ORDER BY et
            RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
        );

         Flink SQL实践

        按照行数聚合 统计每个传感器前5条到现在数据的平均水位

        SELECT 
            id, 
            et, 
            vc,
            avg(vc) OVER (
            	PARTITION BY id
            	ORDER BY et
            	ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
        ) AS avgVC
        FROM ws;

        Flink SQL实践

        也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口:

        SELECT 
            id, 
            et, 
            vc,
        avg(vc) OVER w AS avgVC,
        count(vc) OVER w AS cnt
        FROM ws
        WINDOW w AS (
            PARTITION BY id
            ORDER BY et
            ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
        );

         Flink SQL实践

        特殊语法TOP-N

        ROW_NUMBER() :对数据进行排序标记,标记该行数据在排序后的编号

        WHERE rownum load module hive with ('hive-version'='3.1.3'); [INFO] Execute statement succeed. ​ Flink SQL> show modules; +-------------+ | module name | +-------------+ |       core | |       hive | +-------------+ 2 rows in set ​ Flink SQL> show functions; 发现查到的函数数量变多了,说明加载到了hive的函数 ​

        测试使用hive的内置函数

        select split('a:b', ':');

        Flink SQL实践

        常用 Connector 读写

        kafka

        file

        jdbc

        代码中使用FlinkSQL

        我们想要在代码中使用Table API,必须引入相关的依赖。

            org.apache.flink
            flink-table-api-java-bridge
            ${flink.version}
        

        这里的依赖是一个Java的“桥接器”(bridge),主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。

        如果我们希望在本地的集成开发环境(IDE)里运行Table API和SQL,还需要引入以下依赖:

            org.apache.flink
            flink-table-planner-loader
            ${flink.version}
        
        ​
        
            org.apache.flink
            flink-table-runtime
            ${flink.version}
        
        ​
        
            org.apache.flink
            flink-connector-files
            ${flink.version}
        

        案例1

        新建一个名为sql的包(package)来存放Flink SQL相关Java代码,代码所在的包,例如:org.example.sql

        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.Table;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import static org.apache.flink.table.api.Expressions.$;
        public class SqlDemo {
            public static void main(String[] args) {
                // 创建流执行环境
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                // 创建表环境
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                // 创建表
                tableEnv.executeSql("CREATE TABLE source(\n" +
                        "id INT, \n" +
                        "ts BIGINT, \n"+
                        "vc INT\n"+
                        ")WITH(\n" +
                        "    'connector' = 'datagen', \n" +
                        "    'rows-per-second'='1', \n" +
                        "    'fields.id.kind'='random', \n" +
                        "    'fields.id.min'='1', \n" +
                        "    'fields.id.max'='10', \n" +
                        "    'fields.ts.kind'='sequence', \n" +
                        "    'fields.ts.start'='1', \n" +
                        "    'fields.ts.end'='1000000', \n" +
                        "    'fields.vc.kind'='random', \n" +
                        "    'fields.vc.min'='1', \n" +
                        "    'fields.vc.max'='100'\n" +
                        ");\n");
                tableEnv.executeSql("CREATE TABLE sink (\n" +
                        "    id INT, \n" +
                        "    sumVC INT \n" +
                        ") WITH (\n" +
                        "'connector' = 'print'\n" +
                        ");\n");
                // 执行查询
                //    1.使用sql查询
                Table table = tableEnv.sqlQuery("select id, sum(vc) as sumVC from source where id>5 group by id;");
                //   把table对象注册成表名
                tableEnv.createTemporaryView("tmp", table);
                tableEnv.sqlQuery("select * from tmp where id>7");
                //    2.使用table api查询
        //        Table source = tableEnv.from("source");
        //        Table result = source
        //                .where($("id").isGreater(5))
        //                .groupBy($("id"))
        //                .aggregate($("vc").sum().as("sumVC"))
        //                .select($("id"), $("sumVC"));
                // 输出表
                // sql写法
                tableEnv.executeSql("insert into sink select * from tmp");
                // table api写法
        //        result.executeInsert("sink");
            }
        }
        

        在IDEA运行程序,部分运行结果如下

        Flink SQL实践

        经过分析验证,发现输出结果是由tableEnv.executeSql("insert into sink select * from tmp")输出的。

        案例2

        import java.util.Objects;
        public class WaterSensor {
            public String id;//水位传感器id
            public Long ts;//传感器记录时间戳
            public Integer vc;//水位记录值
            public WaterSensor() {
            }
            public WaterSensor(String id, Long ts, Integer vc) {
                this.id = id;
                this.ts = ts;
                this.vc = vc;
            }
            public String getId() {
                return id;
            }
            public void setId(String id) {
                this.id = id;
            }
            public Long getTs() {
                return ts;
            }
            public void setTs(Long ts) {
                this.ts = ts;
            }
            public Integer getVc() {
                return vc;
            }
            public void setVc(Integer vc) {
                this.vc = vc;
            }
            @Override
            public String toString() {
                return "WaterSensor{" +
                        "id='" + id + '\'' +
                        ", ts=" + ts +
                        ", vc=" + vc +
                        '}';
            }
            @Override
            public boolean equals(Object o) {
                if (this == o) {
                    return true;
                }
                if (o == null || getClass() != o.getClass()) {
                    return false;
                }
                WaterSensor that = (WaterSensor) o;
                return Objects.equals(id, that.id) &&
                        Objects.equals(ts, that.ts) &&
                        Objects.equals(vc, that.vc);
            }
            @Override
            public int hashCode() {
                return Objects.hash(id, ts, vc);
            }
        }
        

        ​​​​​​​

        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.Table;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        public class TableStreamDemo {
            public static void main(String[] args) throws Exception {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                DataStreamSource sensorDS = env.fromElements(
                        new WaterSensor("s1", 1L, 1),
                        new WaterSensor("s1", 2L, 2),
                        new WaterSensor("s2", 2L, 2),
                        new WaterSensor("s3", 3L, 3),
                        new WaterSensor("s3", 4L, 4)
                );
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                // TODO 1. 流转表
                Table sensorTable = tableEnv.fromDataStream(sensorDS);
                tableEnv.createTemporaryView("sensor", sensorTable);
                Table filterTable = tableEnv.sqlQuery("select id,ts,vc from sensor where ts>2");
                Table sumTable = tableEnv.sqlQuery("select id,sum(vc) from sensor group by id");
                // TODO 2. 表转流
                // 2.1 追加流
                tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");
                // 2.2 changelog流(结果需要更新)
                tableEnv.toChangelogStream(sumTable ).print("sum");
                // 只要代码中调用了 DataStreamAPI,就需要 execute,否则不需要
                env.execute();
            }
        }
        

        运行结果

        Flink SQL实践

        案例3

        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.annotation.DataTypeHint;
        import org.apache.flink.table.annotation.FunctionHint;
        import org.apache.flink.table.api.Table;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.apache.flink.table.functions.TableFunction;
        import org.apache.flink.types.Row;
        import static org.apache.flink.table.api.Expressions.$;
        public class MyTableFunctionDemo {
            public static void main(String[] args) throws Exception {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                DataStreamSource strDS = env.fromElements(
                        "hello flink",
                        "hello world hi",
                        "hello java"
                );
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                Table sensorTable = tableEnv.fromDataStream(strDS, $("words"));
                tableEnv.createTemporaryView("str", sensorTable);
                // TODO 2.注册函数
                tableEnv.createTemporaryFunction("SplitFunction", SplitFunction.class);
                // TODO 3.调用 自定义函数
                // 3.1 交叉联结
                tableEnv
                        // 3.1 交叉联结(笛卡尔积)
        //                .sqlQuery("select words,word,length from str,lateral table(SplitFunction(words))")
                        // 3.2 带 on  true 条件的 左联结
        //                .sqlQuery("select words,word,length from str left join lateral table(SplitFunction(words)) on true")
                        // 重命名侧向表中的字段
                        .sqlQuery("select words,newWord,newLength from str left join lateral table(SplitFunction(words))  as T(newWord,newLength) on true")
                        .execute()
                        .print();
            }
            // TODO 1.继承 TableFunction
            // 类型标注: Row包含两个字段:word和length
            @FunctionHint(output = @DataTypeHint("ROW"))
            public static class SplitFunction extends TableFunction {
                // 返回是 void,用 collect方法输出
                public void eval(String str) {
                    for (String word : str.split(" ")) {
                        collect(Row.of(word, word.length()));
                    }
                }
            }
        }
        

        运行结果

        Flink SQL实践

        案例4

        从学生的分数表ScoreTable中计算每个学生的加权平均分。

        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.api.java.tuple.Tuple3;
        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.Table;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.apache.flink.table.functions.AggregateFunction;
        import static org.apache.flink.table.api.Expressions.$;
        public class MyAggregateFunctionDemo {
            public static void main(String[] args) throws Exception {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                
                //  姓名,分数,权重
                DataStreamSource scoreWeightDS = env.fromElements(
                        Tuple3.of("zs",80, 3),
                        Tuple3.of("zs",90, 4),
                        Tuple3.of("zs",95, 4),
                        Tuple3.of("ls",75, 4),
                        Tuple3.of("ls",65, 4),
                        Tuple3.of("ls",85, 4)
                );
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                Table scoreWeightTable = tableEnv.fromDataStream(scoreWeightDS, $("f0").as("name"),$("f1").as("score"), $("f2").as("weight"));
                tableEnv.createTemporaryView("scores", scoreWeightTable);
                // TODO 2.注册函数
                tableEnv.createTemporaryFunction("WeightedAvg", WeightedAvg.class);
                // TODO 3.调用 自定义函数
                tableEnv
                        .sqlQuery("select name,WeightedAvg(score,weight)  from scores group by name")
                        .execute()
                        .print();
            }
            
            // TODO 1.继承 AggregateFunction
            public static class WeightedAvg extends AggregateFunction {
                @Override
                public Double getValue(Tuple2 integerIntegerTuple2) {
                    return integerIntegerTuple2.f0 * 1D / integerIntegerTuple2.f1;
                }
                @Override
                public Tuple2 createAccumulator() {
                    return Tuple2.of(0, 0);
                }
                /**
                 * 累加计算的方法,每来一行数据都会调用一次
                 * @param acc 累加器类型
                 * @param score 第一个参数:分数
                 * @param weight 第二个参数:权重
                 */
                public void accumulate(Tuple2 acc,Integer score,Integer weight){
                    acc.f0 += score * weight;  // 加权总和 =  分数1 * 权重1 + 分数2 * 权重2 +....
                    acc.f1 += weight;         // 权重和 = 权重1 + 权重2 +....
                }
            }
        }
        

         运行结果Flink SQL实践

        案例5

         表聚合函数

        用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。

        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.Table;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.apache.flink.table.functions.TableAggregateFunction;
        import org.apache.flink.util.Collector;
        import static org.apache.flink.table.api.Expressions.$;
        import static org.apache.flink.table.api.Expressions.call;
        public class MyTableAggregateFunctionDemo {
            public static void main(String[] args) throws Exception {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                //  姓名,分数,权重
                DataStreamSource numDS = env.fromElements(3, 6, 12, 5, 8, 9, 4);
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                Table numTable = tableEnv.fromDataStream(numDS, $("num"));
                // TODO 2.注册函数
                tableEnv.createTemporaryFunction("Top2", Top2.class);
                // TODO 3.调用 自定义函数: 只能用 Table API
                numTable
                        .flatAggregate(call("Top2", $("num")).as("value", "rank"))
                        .select( $("value"), $("rank"))
                        .execute().print();
            }
            
            // TODO 1.继承 TableAggregateFunction
            // 返回类型 (数值,排名) =》 (12,1) (9,2)
            // 累加器类型 (第一大的数,第二大的数) ===》 (12,9)
            public static class Top2 extends TableAggregateFunction {
                @Override
                public Tuple2 createAccumulator() {
                    return Tuple2.of(0, 0);
                }
                /**
                 * 每来一个数据调用一次,比较大小,更新 最大的前两个数到 acc中
                 *
                 * @param acc 累加器
                 * @param num 过来的数据
                 */
                public void accumulate(Tuple2 acc, Integer num) {
                    if (num > acc.f0) {
                        // 新来的变第一,原来的第一变第二
                        acc.f1 = acc.f0;
                        acc.f0 = num;
                    } else if (num > acc.f1) {
                        // 新来的变第二,原来的第二不要了
                        acc.f1 = num;
                    }
                }
                
                /**
                 * 输出结果: (数值,排名)两条最大的
                 *
                 * @param acc 累加器
                 * @param out 采集器
                 */
                public void emitValue(Tuple2 acc, Collector out) {
                    if (acc.f0 != 0) {
                        out.collect(Tuple2.of(acc.f0, 1));
                    }
                    if (acc.f1 != 0) {
                        out.collect(Tuple2.of(acc.f1, 2));
                    }
                }
            }
        }

        运行结果

        +----+-------------+-------------+
        | op |       value |        rank |
        +----+-------------+-------------+
        | +I |           3 |           1 |
        | -D |           3 |           1 |
        | +I |           6 |           1 |
        | +I |           3 |           2 |
        | -D |           6 |           1 |
        | -D |           3 |           2 |
        | +I |          12 |           1 |
        | +I |           6 |           2 |
        | -D |          12 |           1 |
        | -D |           6 |           2 |
        | +I |          12 |           1 |
        | +I |           6 |           2 |
        | -D |          12 |           1 |
        | -D |           6 |           2 |
        | +I |          12 |           1 |
        | +I |           8 |           2 |
        | -D |          12 |           1 |
        | -D |           8 |           2 |
        | +I |          12 |           1 |
        | +I |           9 |           2 |
        | -D |          12 |           1 |
        | -D |           9 |           2 |
        | +I |          12 |           1 |
        | +I |           9 |           2 |
        +----+-------------+-------------+
        24 rows in set

        完成!enjoy it!

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]