Flink SQL实践
环境准备
方式1:基于Standalone Flink集群的SQL Client
启动Flink集群
[hadoop@node2 ~]$ start-cluster.sh [hadoop@node2 ~]$ sql-client.sh ... 省略若干日志输出 ... Flink SQL>
方式2:基于Yarn Session Flink集群的SQL Client
启动hadoop集群
[hadoop@node2 ~]$ myhadoop.sh start
使用Yarn Session启动Flink集群
[hadoop@node2 ~]$ yarn-session.sh -d
启动一个基于yarn-session的sql-client
[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session ... 省略若干日志输出 ... Flink SQL>
看到“Flink SQL>”提示符,说明成功开启了Flink的SQL客户端,此时就可以进行SQL相关操作了。
注意:以上选择其中一种方式进行后续操作。
数据库操作
Flink SQL> show databases; +------------------+ | database name | +------------------+ | default_database | +------------------+ 1 row in set Flink SQL> create database mydatabase; [INFO] Execute statement succeed. Flink SQL> show databases; +------------------+ | database name | +------------------+ | default_database | | mydatabase | +------------------+ 2 rows in set Flink SQL> show current database; +-----------------------+ | current database name | +-----------------------+ | default_database | +-----------------------+ 1 row in set 切换当前数据库 Flink SQL> use mydatabase; [INFO] Execute statement succeed. Flink SQL> show current database; +-----------------------+ | current database name | +-----------------------+ | mydatabase | +-----------------------+ 1 row in set Flink SQL> quit; ... ... ... [hadoop@node2 ~]$
表DDL操作
创建表
CREATE TABLE方式
创建test表
CREATE TABLE test( id INT, ts BIGINT, vc INT ) WITH ( 'connector' = 'print' );
LIKE方式
基于test表创建test1,并添加value字段
CREATE TABLE test1 ( `value` STRING ) LIKE test;
查看表
show tables;
查看test表结构
desc test;
查看test1表结构
desc test1;
操作过程
Flink SQL> CREATE TABLE test( > id INT, > ts BIGINT, > vc INT > ) WITH ( > 'connector' = 'print' > ); [INFO] Execute statement succeed. Flink SQL> CREATE TABLE test1 ( > `value` STRING > ) > LIKE test; [INFO] Execute statement succeed. Flink SQL> show tables; +------------+ | table name | +------------+ | test | | test1 | +------------+ 2 rows in set Flink SQL> desc test; +------+--------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +------+--------+------+-----+--------+-----------+ | id | INT | TRUE | | | | | ts | BIGINT | TRUE | | | | | vc | INT | TRUE | | | | +------+--------+------+-----+--------+-----------+ 3 rows in set Flink SQL> desc test1; +-------+--------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-------+--------+------+-----+--------+-----------+ | id | INT | TRUE | | | | | ts | BIGINT | TRUE | | | | | vc | INT | TRUE | | | | | value | STRING | TRUE | | | | +-------+--------+------+-----+--------+-----------+ 4 rows in set
CTAS方式
CTAS:CREATE TABLE AS SELECT
create table test2 as select id, ts from test;
但这种方式不支持是print的连接器。因为print只能当作sink,不能当作source。
Flink SQL> create table test2 as select id, ts from test; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Connector 'print' can only be used as a sink. It cannot be used as a source.
修改表
修改表名
alter table test1 rename to test11;
操作过程
Flink SQL> alter table test1 rename to test11; [INFO] Execute statement succeed. Flink SQL> show tables; +------------+ | table name | +------------+ | test | | test11 | +------------+ 2 rows in set
添加表字段
创建test2表
CREATE TABLE test2( id INT, ts BIGINT, vc INT ) WITH ( 'connector' = 'print' );
查看test2表结构
desc test2;
添加表字段,并放在第一个字段
ALTER TABLE test2 ADD `status` INT COMMENT 'status descriptor' FIRST;
查看test2表结构
desc test2;
操作过程
Flink SQL> CREATE TABLE test2( > id INT, > ts BIGINT, > vc INT > ) WITH ( > 'connector' = 'print' > ); [INFO] Execute statement succeed. Flink SQL> desc test2; +------+--------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +------+--------+------+-----+--------+-----------+ | id | INT | TRUE | | | | | ts | BIGINT | TRUE | | | | | vc | INT | TRUE | | | | +------+--------+------+-----+--------+-----------+ 3 rows in set Flink SQL> ALTER TABLE test2 ADD `status` INT COMMENT 'status descriptor' FIRST; [INFO] Execute statement succeed. Flink SQL> desc test2; +--------+--------+------+-----+--------+-----------+-------------------+ | name | type | null | key | extras | watermark | comment | +--------+--------+------+-----+--------+-----------+-------------------+ | status | INT | TRUE | | | | status descriptor | | id | INT | TRUE | | | | | | ts | BIGINT | TRUE | | | | | | vc | INT | TRUE | | | | | +--------+--------+------+-----+--------+-----------+-------------------+ 4 rows in set
修改表字段
修改表字段
ALTER TABLE test2 MODIFY (vc DOUBLE NOT NULL, status STRING COMMENT 'status desc');
查看表结构
desc test2;
操作过程
Flink SQL> ALTER TABLE test2 MODIFY (vc DOUBLE NOT NULL, status STRING COMMENT 'status desc'); [INFO] Execute statement succeed. Flink SQL> desc test2; +--------+--------+-------+-----+--------+-----------+-------------+ | name | type | null | key | extras | watermark | comment | +--------+--------+-------+-----+--------+-----------+-------------+ | status | STRING | TRUE | | | | status desc | | id | INT | TRUE | | | | | | ts | BIGINT | TRUE | | | | | | vc | DOUBLE | FALSE | | | | | +--------+--------+-------+-----+--------+-----------+-------------+ 4 rows in set
删除表字段
删除表字段
ALTER TABLE test2 DROP (ts, status);
查看表结构
desc test2;
操作过程
Flink SQL> ALTER TABLE test2 DROP (ts, status); [INFO] Execute statement succeed. Flink SQL> desc test2; +------+--------+-------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +------+--------+-------+-----+--------+-----------+ | id | INT | TRUE | | | | | vc | DOUBLE | FALSE | | | | +------+--------+-------+-----+--------+-----------+ 2 rows in set
删除表
语法
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
案例
drop table if exists test2;
操作过程
Flink SQL> drop table if exists test2; [INFO] Execute statement succeed.
表DML查询操作
Select
select
SELECT测试及结果显示模式设置
SELECT 'Hello World', 'It''s me';
注意:SELECT后面的字符串必须用单引号括起来,如果字符串里面包含有单引号,则再多用一个单引号(如:'It's me'写成'It''s me')。
结果如下:
按q键返回命令行。
设置结果显示模式
可以看到,结果显示模式默认table,还可以设置为tableau、changelog。
- 结果显示模式设置为tableau
SET sql-client.execution.result-mode=tableau;
操作过程
Flink SQL> SET sql-client.execution.result-mode=tableau; [INFO] Execute statement succeed. Flink SQL> SELECT 'Hello World', 'It''s me'; ... 省略若干日志输出 ... +----+--------------------------------+--------------------------------+ | op | EXPR$0 | EXPR$1 | +----+--------------------------------+--------------------------------+ | +I | Hello World | It's me | +----+--------------------------------+--------------------------------+ Received a total of 1 row
效果如下
- 显示模式设置为changelog
SET sql-client.execution.result-mode=changelog;
操作过程
Flink SQL> SET sql-client.execution.result-mode=changelog; [INFO] Execute statement succeed. Flink SQL> SELECT 'Hello World', 'It''s me'; ... 省略若干日志输出 ...
显示结果如下:
根据个人喜好,设置其中一种结果显示模式。
Source表
通过数据生成器创建source表
CREATE TABLE source ( id INT, ts BIGINT, vc INT ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100' );
查询source表数据
select * from source;
查询结果
按住ctrl + c 结束查询。
SELECT id, vc + 10 FROM source;
执行效果如下
Sink表
创建sink表
CREATE TABLE sink ( id INT, ts BIGINT, vc INT ) WITH ( 'connector' = 'print' );
查询source表数据插入sink表
INSERT INTO sink select * from source;
直接查询sink表数据,报错如下:
Flink SQL> select * from sink; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Connector 'print' can only be used as a sink. It cannot be used as a source.
正确查询方式,通过8088进入Application Master进入Web UI,看到一个Running Job
通过这个Runnig Job的Task Manager查看结果
取消作业
select where
SELECT id FROM source WHERE id >5;
With子句
WITH提供了一种编写辅助语句的方法,以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE),可以认为它们定义了仅为一个查询而存在的临时视图。
WITH source_with_total AS ( SELECT id, vc+10 AS total FROM source ) SELECT id, SUM(total) FROM source_with_total GROUP BY id;
执行效果如下
分组聚合
SELECT vc, COUNT(*) as cnt FROM source GROUP BY vc;
-U是撤回流
创建source1表
CREATE TABLE source1 ( dim STRING, user_id BIGINT, price BIGINT, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.dim.length' = '1', 'fields.user_id.min' = '1', 'fields.user_id.max' = '100000', 'fields.price.min' = '1', 'fields.price.max' = '100000' );
创建sink1表
CREATE TABLE sink1 ( dim STRING, pv BIGINT, sum_price BIGINT, max_price BIGINT, min_price BIGINT, uv BIGINT, window_start bigint ) WITH ( 'connector' = 'print' );
查询对source1表进行分组聚合并插入到sink1表中
insert into sink1 select dim, count(*) as pv, sum(price) as sum_price, max(price) as max_price, min(price) as min_price, -- 计算 uv 数 count(distinct user_id) as uv, cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start from source1 group by dim, -- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min, cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint);
查看结果
在Web UI中取消作业。
多维分析
Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets:
SELECT supplier_id , rating , product_id , COUNT(*) FROM ( VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4) ) -- 供应商id、产品id、评级 AS Products(supplier_id, product_id, rating) GROUP BY GROUPING SETS( (supplier_id, product_id, rating), (supplier_id, product_id), (supplier_id, rating), (supplier_id), (product_id, rating), (product_id), (rating), () );
运行结果
分组窗口聚合
准备数据
CREATE TABLE ws ( id INT, vc INT, pt AS PROCTIME(), --处理时间 et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间 WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.id.min' = '1', 'fields.id.max' = '3', 'fields.vc.min' = '1', 'fields.vc.max' = '100' );
滚动窗口
滚动窗口(时间属性字段,窗口长度)
select id, TUMBLE_START(et, INTERVAL '5' SECOND) wstart, TUMBLE_END(et, INTERVAL '5' SECOND) wend, sum(vc) sumVc from ws group by id, TUMBLE(et, INTERVAL '5' SECOND);
观察结果,可以看到按id分组进行统计,窗口长度(wend-wstart)为5秒,按Q退出查询。
滑动窗口
滑动窗口(时间属性字段,滑动步长,窗口长度)
select id, HOP_START(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wstart, HOP_END(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wend, sum(vc) sumVc from ws group by id, HOP(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND);
从结果中看到,窗口长度是5秒,同一id与上一个窗口滑动的步长为3秒。
会话窗口
会话窗口(时间属性字段,会话间隔)
select id, SESSION_START(et, INTERVAL '5' SECOND) wstart, SESSION_END(et, INTERVAL '5' SECOND) wend, sum(vc) sumVc from ws group by id, SESSION(et, INTERVAL '5' SECOND);
因为数据源源不断生成,所以不满足5s没有数据的会话间隔。
注意:分组窗口基本被更加强大的TVF窗口替代。
窗口表值函数(TVF)聚合
对比分组窗口(GroupWindow),TVF窗口更有效和强大。包括:
-
提供更多的性能优化手段
-
支持GroupingSets语法
-
可以在window聚合中使用TopN
-
提供累积窗口
对于窗口表值函数,窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段window_start和window_end
FROM TABLE( 窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL时间…) ) GROUP BY [window_start,][window_end,] --可选
滚动窗口
SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, id;
从结果来看,第一个id为2的窗口时间范围是[35,40),第二个id为2的窗口时间范围是[40,45),正是长度为5秒的滚动窗口。
滑动窗口
要求: 窗口长度=滑动步长的整数倍(底层会优化成多个小滚动窗口)
SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS)) GROUP BY window_start, window_end, id;
观察相同id的窗口数据,例如:id为2,时间范围[55,05),[00,10),...
数据符合窗口长度为10秒、滑动步长为5秒的滑动窗口。
累积窗口
累积窗口会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。
注意: 窗口最大长度 = 累积步长的整数倍
SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL '2' SECONDS , INTERVAL '6' SECONDS)) GROUP BY window_start, window_end, id;
观察结果,id为1的窗口时间数据:[36,38),[36,40),[36,42),[42,44),...
符合累计窗口的特点。
多维分析
SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, rollup( (id) ) -- cube( (id) ) -- grouping sets( (id),() ) ;
rollup在多维分析中是“上卷”的意思,即将数据按某种指定的粒度进行进一步聚合,获得更粗粒度的聚合数据。
从以上结果中,截取[00,05)的数据
可以看到基于id汇总,id=1 聚合值为860,id=2 聚合值为907,id=3 聚合值为727,上卷为更粗粒度(不区分id了,id在这里为NULL)的聚合数据得到2494(860+907+727=2494)。
Over 聚合
OVER聚合为一系列有序行的每个输入行计算一个聚合值。与GROUP BY聚合相比,OVER聚合不会将每个组的结果行数减少为一行。相反,OVER聚合为每个输入行生成一个聚合值。 可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
语法
SELECT agg_func(agg_col) OVER ( [PARTITION BY col1[, col2, ...]] ORDER BY time_col range_definition), ... FROM ...
ORDER BY:必须是时间戳列,只能升序
range_definition:标识聚合窗口的聚合数据范围,有两种指定数据范围的方式,1.按照行数聚合,2.按照时间区间聚合
案例
按照时间区间聚合 统计每个传感器前10秒到现在收到的水位数据(vc)条数。
SELECT id, et, vc, count(vc) OVER ( PARTITION BY id ORDER BY et RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW ) AS cnt FROM ws;
也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口,便于重复使用:
SELECT id, et, vc, count(vc) OVER w AS cnt, sum(vc) OVER w AS sumVC FROM ws WINDOW w AS ( PARTITION BY id ORDER BY et RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW );
按照行数聚合 统计每个传感器前5条到现在数据的平均水位
SELECT id, et, vc, avg(vc) OVER ( PARTITION BY id ORDER BY et ROWS BETWEEN 5 PRECEDING AND CURRENT ROW ) AS avgVC FROM ws;
也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口:
SELECT id, et, vc, avg(vc) OVER w AS avgVC, count(vc) OVER w AS cnt FROM ws WINDOW w AS ( PARTITION BY id ORDER BY et ROWS BETWEEN 5 PRECEDING AND CURRENT ROW );
特殊语法TOP-N
ROW_NUMBER() :对数据进行排序标记,标记该行数据在排序后的编号
WHERE rownum load module hive with ('hive-version'='3.1.3'); [INFO] Execute statement succeed. Flink SQL> show modules; +-------------+ | module name | +-------------+ | core | | hive | +-------------+ 2 rows in set Flink SQL> show functions; 发现查到的函数数量变多了,说明加载到了hive的函数
测试使用hive的内置函数
select split('a:b', ':');
常用 Connector 读写
kafka
file
jdbc
代码中使用FlinkSQL
我们想要在代码中使用Table API,必须引入相关的依赖。
org.apache.flink flink-table-api-java-bridge ${flink.version}
这里的依赖是一个Java的“桥接器”(bridge),主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。
如果我们希望在本地的集成开发环境(IDE)里运行Table API和SQL,还需要引入以下依赖:
org.apache.flink flink-table-planner-loader ${flink.version} org.apache.flink flink-table-runtime ${flink.version} org.apache.flink flink-connector-files ${flink.version}
案例1
新建一个名为sql的包(package)来存放Flink SQL相关Java代码,代码所在的包,例如:org.example.sql
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class SqlDemo { public static void main(String[] args) { // 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 创建表 tableEnv.executeSql("CREATE TABLE source(\n" + "id INT, \n" + "ts BIGINT, \n"+ "vc INT\n"+ ")WITH(\n" + " 'connector' = 'datagen', \n" + " 'rows-per-second'='1', \n" + " 'fields.id.kind'='random', \n" + " 'fields.id.min'='1', \n" + " 'fields.id.max'='10', \n" + " 'fields.ts.kind'='sequence', \n" + " 'fields.ts.start'='1', \n" + " 'fields.ts.end'='1000000', \n" + " 'fields.vc.kind'='random', \n" + " 'fields.vc.min'='1', \n" + " 'fields.vc.max'='100'\n" + ");\n"); tableEnv.executeSql("CREATE TABLE sink (\n" + " id INT, \n" + " sumVC INT \n" + ") WITH (\n" + "'connector' = 'print'\n" + ");\n"); // 执行查询 // 1.使用sql查询 Table table = tableEnv.sqlQuery("select id, sum(vc) as sumVC from source where id>5 group by id;"); // 把table对象注册成表名 tableEnv.createTemporaryView("tmp", table); tableEnv.sqlQuery("select * from tmp where id>7"); // 2.使用table api查询 // Table source = tableEnv.from("source"); // Table result = source // .where($("id").isGreater(5)) // .groupBy($("id")) // .aggregate($("vc").sum().as("sumVC")) // .select($("id"), $("sumVC")); // 输出表 // sql写法 tableEnv.executeSql("insert into sink select * from tmp"); // table api写法 // result.executeInsert("sink"); } }
在IDEA运行程序,部分运行结果如下
经过分析验证,发现输出结果是由tableEnv.executeSql("insert into sink select * from tmp")输出的。
案例2
import java.util.Objects; public class WaterSensor { public String id;//水位传感器id public Long ts;//传感器记录时间戳 public Integer vc;//水位记录值 public WaterSensor() { } public WaterSensor(String id, Long ts, Integer vc) { this.id = id; this.ts = ts; this.vc = vc; } public String getId() { return id; } public void setId(String id) { this.id = id; } public Long getTs() { return ts; } public void setTs(Long ts) { this.ts = ts; } public Integer getVc() { return vc; } public void setVc(Integer vc) { this.vc = vc; } @Override public String toString() { return "WaterSensor{" + "id='" + id + '\'' + ", ts=" + ts + ", vc=" + vc + '}'; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } WaterSensor that = (WaterSensor) o; return Objects.equals(id, that.id) && Objects.equals(ts, that.ts) && Objects.equals(vc, that.vc); } @Override public int hashCode() { return Objects.hash(id, ts, vc); } }
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class TableStreamDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 2L, 2), new WaterSensor("s2", 2L, 2), new WaterSensor("s3", 3L, 3), new WaterSensor("s3", 4L, 4) ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // TODO 1. 流转表 Table sensorTable = tableEnv.fromDataStream(sensorDS); tableEnv.createTemporaryView("sensor", sensorTable); Table filterTable = tableEnv.sqlQuery("select id,ts,vc from sensor where ts>2"); Table sumTable = tableEnv.sqlQuery("select id,sum(vc) from sensor group by id"); // TODO 2. 表转流 // 2.1 追加流 tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter"); // 2.2 changelog流(结果需要更新) tableEnv.toChangelogStream(sumTable ).print("sum"); // 只要代码中调用了 DataStreamAPI,就需要 execute,否则不需要 env.execute(); } }
运行结果
案例3
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class MyTableFunctionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource strDS = env.fromElements( "hello flink", "hello world hi", "hello java" ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table sensorTable = tableEnv.fromDataStream(strDS, $("words")); tableEnv.createTemporaryView("str", sensorTable); // TODO 2.注册函数 tableEnv.createTemporaryFunction("SplitFunction", SplitFunction.class); // TODO 3.调用 自定义函数 // 3.1 交叉联结 tableEnv // 3.1 交叉联结(笛卡尔积) // .sqlQuery("select words,word,length from str,lateral table(SplitFunction(words))") // 3.2 带 on true 条件的 左联结 // .sqlQuery("select words,word,length from str left join lateral table(SplitFunction(words)) on true") // 重命名侧向表中的字段 .sqlQuery("select words,newWord,newLength from str left join lateral table(SplitFunction(words)) as T(newWord,newLength) on true") .execute() .print(); } // TODO 1.继承 TableFunction // 类型标注: Row包含两个字段:word和length @FunctionHint(output = @DataTypeHint("ROW")) public static class SplitFunction extends TableFunction { // 返回是 void,用 collect方法输出 public void eval(String str) { for (String word : str.split(" ")) { collect(Row.of(word, word.length())); } } } }
运行结果
案例4
从学生的分数表ScoreTable中计算每个学生的加权平均分。
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.AggregateFunction; import static org.apache.flink.table.api.Expressions.$; public class MyAggregateFunctionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 姓名,分数,权重 DataStreamSource scoreWeightDS = env.fromElements( Tuple3.of("zs",80, 3), Tuple3.of("zs",90, 4), Tuple3.of("zs",95, 4), Tuple3.of("ls",75, 4), Tuple3.of("ls",65, 4), Tuple3.of("ls",85, 4) ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table scoreWeightTable = tableEnv.fromDataStream(scoreWeightDS, $("f0").as("name"),$("f1").as("score"), $("f2").as("weight")); tableEnv.createTemporaryView("scores", scoreWeightTable); // TODO 2.注册函数 tableEnv.createTemporaryFunction("WeightedAvg", WeightedAvg.class); // TODO 3.调用 自定义函数 tableEnv .sqlQuery("select name,WeightedAvg(score,weight) from scores group by name") .execute() .print(); } // TODO 1.继承 AggregateFunction public static class WeightedAvg extends AggregateFunction { @Override public Double getValue(Tuple2 integerIntegerTuple2) { return integerIntegerTuple2.f0 * 1D / integerIntegerTuple2.f1; } @Override public Tuple2 createAccumulator() { return Tuple2.of(0, 0); } /** * 累加计算的方法,每来一行数据都会调用一次 * @param acc 累加器类型 * @param score 第一个参数:分数 * @param weight 第二个参数:权重 */ public void accumulate(Tuple2 acc,Integer score,Integer weight){ acc.f0 += score * weight; // 加权总和 = 分数1 * 权重1 + 分数2 * 权重2 +.... acc.f1 += weight; // 权重和 = 权重1 + 权重2 +.... } } }
案例5
表聚合函数
用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.util.Collector; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; public class MyTableAggregateFunctionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 姓名,分数,权重 DataStreamSource numDS = env.fromElements(3, 6, 12, 5, 8, 9, 4); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table numTable = tableEnv.fromDataStream(numDS, $("num")); // TODO 2.注册函数 tableEnv.createTemporaryFunction("Top2", Top2.class); // TODO 3.调用 自定义函数: 只能用 Table API numTable .flatAggregate(call("Top2", $("num")).as("value", "rank")) .select( $("value"), $("rank")) .execute().print(); } // TODO 1.继承 TableAggregateFunction // 返回类型 (数值,排名) =》 (12,1) (9,2) // 累加器类型 (第一大的数,第二大的数) ===》 (12,9) public static class Top2 extends TableAggregateFunction { @Override public Tuple2 createAccumulator() { return Tuple2.of(0, 0); } /** * 每来一个数据调用一次,比较大小,更新 最大的前两个数到 acc中 * * @param acc 累加器 * @param num 过来的数据 */ public void accumulate(Tuple2 acc, Integer num) { if (num > acc.f0) { // 新来的变第一,原来的第一变第二 acc.f1 = acc.f0; acc.f0 = num; } else if (num > acc.f1) { // 新来的变第二,原来的第二不要了 acc.f1 = num; } } /** * 输出结果: (数值,排名)两条最大的 * * @param acc 累加器 * @param out 采集器 */ public void emitValue(Tuple2 acc, Collector out) { if (acc.f0 != 0) { out.collect(Tuple2.of(acc.f0, 1)); } if (acc.f1 != 0) { out.collect(Tuple2.of(acc.f1, 2)); } } } }
运行结果
+----+-------------+-------------+ | op | value | rank | +----+-------------+-------------+ | +I | 3 | 1 | | -D | 3 | 1 | | +I | 6 | 1 | | +I | 3 | 2 | | -D | 6 | 1 | | -D | 3 | 2 | | +I | 12 | 1 | | +I | 6 | 2 | | -D | 12 | 1 | | -D | 6 | 2 | | +I | 12 | 1 | | +I | 6 | 2 | | -D | 12 | 1 | | -D | 6 | 2 | | +I | 12 | 1 | | +I | 8 | 2 | | -D | 12 | 1 | | -D | 8 | 2 | | +I | 12 | 1 | | +I | 9 | 2 | | -D | 12 | 1 | | -D | 9 | 2 | | +I | 12 | 1 | | +I | 9 | 2 | +----+-------------+-------------+ 24 rows in set
完成!enjoy it!
-
- 显示模式设置为changelog