Flink窗口理论到实践 | 大数据技术
⭐简单说两句⭐
✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~
作者:小叮当撩代码,CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页:小叮当撩代码
🔎GZH:哆啦A梦撩代码
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
文章目录
- Flink窗口
- 😍窗口
- 😎概念
- 🐯窗口的控制属性
- 🕹️窗口程序的骨架结构
- ⏰窗口的生命周期
- ⌨️窗口的分类
- 💿基于时间的滑动和滚动窗口
- **📲滚动窗口- TumblingWindow概念**
- 💸**滑动窗口– SlidingWindow概念**
- 💡会话窗口
- 🩷**代码实战**
- 🚀窗口函数(Window Functions)
- 🚦概念
- 🏖️ReduceFunction
- 🏝️AggregateFunction
- 🏜️ProcessWindowFunction
- ⛰️增量聚合的 ProcessWindowFunction
- 🏔️使用 ReduceFunction 增量聚合
- 🗻使用 AggregateFunction 增量聚合
- 🧡Triggers
- 💛Evictors
Flink窗口
😍窗口
😎概念
Flink 认为 Batch 是 Streaming 的一个特例,所以Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完善的窗口机制。
在Flink中,窗口其实不是一个框,应该理解成一个桶,Flink可以把流切割成有限大小的多个存储桶( bucket),每个数据都会分发到对应的桶中,当达到触发窗口计算的时候,就会对桶中的数据进行处理。
🐯窗口的控制属性
窗口的控制属性有两个:窗口的长度、窗口的间隔
窗口的长度(大小): 决定了要计算最近多长时间的数据
窗口的间隔: 决定了每隔多久计算一次
举例:每隔5分钟,计算最近24小时的热搜词,24小时是长度,每隔5分钟是间隔。
🕹️窗口程序的骨架结构
一个Flink窗口应用的大致骨架结构如下所示:
Keyed Window --键控窗口
// Keyed Window stream .keyBy(...) // 决定并返回会话间隔 })) . // 决定并返回会话间隔 })) . /** * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource @Override public CarInfo map(String value) throws Exception { String[] split = value.split(","); return new CarInfo(Integer.parseInt(split[0]), Integer.parseInt(split[1])); } }) .keyBy(CarInfo::getLightId) .sum("carNum") .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CarInfo { // 信号灯编号 private int lightId; // 通过该信号灯的车的数量 private int carNum; } } /** * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource @Override public CarInfo map(String value) throws Exception { String[] split = value.split(","); return new CarInfo(Integer.parseInt(split[0]), Integer.parseInt(split[1]), LocalDateTime.now()); } }) .keyBy(CarInfo::getLightId) //每隔1分钟统计一次 .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .sum("carNum") .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CarInfo { // 信号灯编号 private int lightId; // 通过该信号灯的车的数量 private int carNum; //time private LocalDateTime time; } } /** * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource @Override public CarInfo map(String value) throws Exception { String[] split = value.split(","); return new CarInfo(Integer.parseInt(split[0]), Integer.parseInt(split[1]), LocalDateTime.now()); } }) .keyBy(CarInfo::getLightId) //每隔10统计一次,最近20秒内的数据 .window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10))) .sum("carNum") .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CarInfo { // 信号灯编号 private int lightId; // 通过该信号灯的车的数量 private int carNum; //time private LocalDateTime time; } } public Tuple2 return new Tuple2 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource String[] split = value.split(","); return new ReduceInfo( Integer.parseInt(split[0]) , Long.parseLong(split[1]) , Integer.parseInt(split[2])); }) .keyBy(ReduceInfo::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .reduce((ReduceInfo value1, ReduceInfo value2) - { System.out.println("调用reduce方法:" + value1 + " " + value2); return new ReduceInfo(value1.getId() , value1.getTime() , value1.getNum() + value2.getNum()); }) .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class ReduceInfo { //id private int id; //time private long time; //num private int num; } } @Override public Tuple2 return new Tuple2 return new Tuple2 return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2 return new Tuple2 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource String[] split = value.split(","); return new ReduceFunctionDemo.ReduceInfo( Integer.parseInt(split[0]) , Long.parseLong(split[1]) , Integer.parseInt(split[2])); }) .keyBy(ReduceFunctionDemo.ReduceInfo::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .aggregate(new AggregateFunction @Override public Integer createAccumulator() { System.out.println("创建累加器"); return 0; } @Override public Integer add(ReduceFunctionDemo.ReduceInfo value, Integer accumulator) { System.out.println("调用add方法:" + value + " " + accumulator); return value.getNum() + accumulator; } @Override public Integer getResult(Integer accumulator) { System.out.println("调用getResult方法:" + accumulator); return accumulator; } @Override public Integer merge(Integer a, Integer b) { System.out.println("调用merge方法:" + a + " " + b); return 0; } }) .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class ReduceInfo { //id private int id; //time private long time; //num private int num; } } /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ public abstract void process( KEY key, Context context, Iterable} /** * The context holding window metadata. */ public abstract class Context implements java.io.Serializable { /** * Returns the window that is being evaluated. */ public abstract W window(); /** Returns the current processing time. */ public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */ public abstract long currentWatermark(); /** * State accessor for per-key and per-window state. * * @Override public void process(String key, Context context, Iterable long count = 0; for (Tuple2 count++; } out.collect("Window: " + context.window() + "count: " + count); } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource String[] split = value.split(","); return new ProcessWindowFunctionDemo.ProcessInfo( Integer.parseInt(split[0]) , Long.parseLong(split[1]) , Integer.parseInt(split[2])); }) .keyBy(ProcessWindowFunctionDemo.ProcessInfo::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new ProcessWindowFunction @Override public void process(Integer key , ProcessWindowFunction System.out.println("调用process方法:key:" + key + "\n" + "elements:" + elements); int sum = 0; for (ProcessInfo element : elements) { sum += element.getNum(); } out.collect(sum); } }) .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class ProcessInfo { //id private int id; //time private long time; //num private int num; } } public SensorReading reduce(SensorReading r1, SensorReading r2) { return r1.value() r2.value() ? r2 : r1; } } private static class MyProcessWindowFunction extends ProcessWindowFunction public void process(String key, Context context, Iterable SensorReading min = minReadings.iterator().next(); out.collect(new Tuple2 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource @Override public ReduceProcessInfo map(String value) throws Exception { String[] split = value.split(","); return new ReduceProcessInfo( Integer.parseInt(split[0]) , Long.parseLong(split[1]) , Integer.parseInt(split[2])); } }) .keyBy(ReduceProcessInfo::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .reduce(new MyReduceFunction(), new MyProcessFunction()) .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class ReduceProcessInfo { //id private int id; //time private long time; //num private int num; } public static class MyProcessFunction extends ProcessWindowFunction @Override public void process(Integer key, ProcessWindowFunction System.out.println("调用process方法:key:" + key + "\n" + "elements:" + elements); ReduceProcessInfo next = elements.iterator().next(); out.collect(new Tuple2 @Override public ReduceProcessInfo reduce(ReduceProcessInfo value1, ReduceProcessInfo value2) throws Exception { System.out.println("调用reduce方法:" + value1 + " " + value2); return value1.num
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。