Flink窗口理论到实践 | 大数据技术

06-15 1010阅读

⭐简单说两句⭐

✨ 正在努力的小叮当~

💖 超级爱分享,分享各种有趣干货!

👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板

🌈 感谢关注,关注了你就是我的超级粉丝啦!

🔒 以下内容仅对你可见~

作者:小叮当撩代码,CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:小叮当撩代码

🔎GZH:哆啦A梦撩代码

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

Flink窗口理论到实践 | 大数据技术

文章目录

    • Flink窗口
      • 😍窗口
        • 😎概念
        • 🐯窗口的控制属性
        • 🕹️窗口程序的骨架结构
        • ⏰窗口的生命周期
        • ⌨️窗口的分类
        • 💿基于时间的滑动和滚动窗口
          • **📲滚动窗口- TumblingWindow概念**
          • 💸**滑动窗口– SlidingWindow概念**
          • 💡会话窗口
          • 🩷**代码实战**
          • 🚀窗口函数(Window Functions)
            • 🚦概念
            • 🏖️ReduceFunction
            • 🏝️AggregateFunction
            • 🏜️ProcessWindowFunction
            • ⛰️增量聚合的 ProcessWindowFunction
              • 🏔️使用 ReduceFunction 增量聚合
              • 🗻使用 AggregateFunction 增量聚合
              • 🧡Triggers
              • 💛Evictors

                Flink窗口理论到实践 | 大数据技术

                Flink窗口

                😍窗口

                😎概念

                Flink 认为 Batch 是 Streaming 的一个特例,所以Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完善的窗口机制。

                在Flink中,窗口其实不是一个框,应该理解成一个桶,Flink可以把流切割成有限大小的多个存储桶( bucket),每个数据都会分发到对应的桶中,当达到触发窗口计算的时候,就会对桶中的数据进行处理。

                Flink窗口理论到实践 | 大数据技术

                🐯窗口的控制属性

                窗口的控制属性有两个:窗口的长度、窗口的间隔

                窗口的长度(大小): 决定了要计算最近多长时间的数据

                窗口的间隔: 决定了每隔多久计算一次

                举例:每隔5分钟,计算最近24小时的热搜词,24小时是长度,每隔5分钟是间隔。

                🕹️窗口程序的骨架结构

                一个Flink窗口应用的大致骨架结构如下所示:

                Keyed Window --键控窗口

                // Keyed Window
                stream
                       .keyBy(...)               
                        // 决定并返回会话间隔
                    }))
                    .
                        // 决定并返回会话间隔
                    }))
                    .
                    /**
                     * 有如下数据表示:
                     * 信号灯编号和通过该信号灯的车的数量
                     * 9,3
                     * 9,2
                     */
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        DataStreamSource
                            @Override
                            public CarInfo map(String value) throws Exception {
                                String[] split = value.split(",");
                                return new CarInfo(Integer.parseInt(split[0]), Integer.parseInt(split[1]));
                            }
                        })
                                .keyBy(CarInfo::getLightId)
                                .sum("carNum")
                                .print();
                        env.execute();
                    }
                    @Data
                    @AllArgsConstructor
                    @NoArgsConstructor
                    public static class CarInfo {
                        // 信号灯编号
                        private int lightId;
                        // 通过该信号灯的车的数量
                        private int carNum;
                    }
                }
                
                    /**
                     * 有如下数据表示:
                     * 信号灯编号和通过该信号灯的车的数量
                     * 9,3
                     * 9,2
                     * 9,7
                     * 4,9
                     * 2,6
                     * 1,5
                     * 2,3
                     * 5,7
                     * 5,4
                     * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
                     * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
                     */
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        DataStreamSource
                            @Override
                            public CarInfo map(String value) throws Exception {
                                String[] split = value.split(",");
                                return new CarInfo(Integer.parseInt(split[0]), Integer.parseInt(split[1]), LocalDateTime.now());
                            }
                        })
                                .keyBy(CarInfo::getLightId)
                                //每隔1分钟统计一次
                                .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                                .sum("carNum")
                                .print();
                        env.execute();
                    }
                    @Data
                    @AllArgsConstructor
                    @NoArgsConstructor
                    public static class CarInfo {
                        // 信号灯编号
                        private int lightId;
                        // 通过该信号灯的车的数量
                        private int carNum;
                        //time
                        private LocalDateTime time;
                    }
                }
                
                    /**
                     * 有如下数据表示:
                     * 信号灯编号和通过该信号灯的车的数量
                     * 9,3
                     * 9,2
                     * 9,7
                     * 4,9
                     * 2,6
                     * 1,5
                     * 2,3
                     * 5,7
                     * 5,4
                     */
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        DataStreamSource
                            @Override
                            public CarInfo map(String value) throws Exception {
                                String[] split = value.split(",");
                                return new CarInfo(Integer.parseInt(split[0]), Integer.parseInt(split[1]), LocalDateTime.now());
                            }
                        })
                                .keyBy(CarInfo::getLightId)
                                //每隔10统计一次,最近20秒内的数据
                                .window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10)))
                                .sum("carNum")
                                .print();
                        env.execute();
                    }
                    @Data
                    @AllArgsConstructor
                    @NoArgsConstructor
                    public static class CarInfo {
                        // 信号灯编号
                        private int lightId;
                        // 通过该信号灯的车的数量
                        private int carNum;
                        //time
                        private LocalDateTime time;
                    }
                }
                
                      public Tuple2
                        return new Tuple2
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        DataStreamSource
                                    String[] split = value.split(",");
                                    return new ReduceInfo(
                                            Integer.parseInt(split[0])
                                            , Long.parseLong(split[1])
                                            , Integer.parseInt(split[2]));
                                })
                                .keyBy(ReduceInfo::getId)
                                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                                .reduce((ReduceInfo value1, ReduceInfo value2) - {
                                    System.out.println("调用reduce方法:" + value1 + "  " + value2);
                                    return new ReduceInfo(value1.getId()
                                            , value1.getTime()
                                            , value1.getNum() + value2.getNum());
                                })
                                .print();
                        env.execute();
                    }
                    @Data
                    @AllArgsConstructor
                    @NoArgsConstructor
                    public static class ReduceInfo {
                        //id
                        private int id;
                        //time
                        private long time;
                        //num
                        private int num;
                    }
                }
                
                  @Override
                  public Tuple2
                    return new Tuple2
                    return new Tuple2
                    return ((double) accumulator.f0) / accumulator.f1;
                  }
                  @Override
                  public Tuple2
                    return new Tuple2
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        DataStreamSource
                                    String[] split = value.split(",");
                                    return new ReduceFunctionDemo.ReduceInfo(
                                            Integer.parseInt(split[0])
                                            , Long.parseLong(split[1])
                                            , Integer.parseInt(split[2]));
                                })
                                .keyBy(ReduceFunctionDemo.ReduceInfo::getId)
                                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                                .aggregate(new AggregateFunction
                                    @Override
                                    public Integer createAccumulator() {
                                        System.out.println("创建累加器");
                                        return 0;
                                    }
                                    @Override
                                    public Integer add(ReduceFunctionDemo.ReduceInfo value, Integer accumulator) {
                                        System.out.println("调用add方法:" + value + "  " + accumulator);
                                        return value.getNum() + accumulator;
                                    }
                                    @Override
                                    public Integer getResult(Integer accumulator) {
                                        System.out.println("调用getResult方法:" + accumulator);
                                        return accumulator;
                                    }
                                    @Override
                                    public Integer merge(Integer a, Integer b) {
                                        System.out.println("调用merge方法:" + a + "  " + b);
                                        return 0;
                                    }
                                })
                                .print();
                        env.execute();
                    }
                    @Data
                    @AllArgsConstructor
                    @NoArgsConstructor
                    public static class ReduceInfo {
                        //id
                        private int id;
                        //time
                        private long time;
                        //num
                        private int num;
                    }
                }
                
                    /**
                     * Evaluates the window and outputs none or several elements.
                     *
                     * @param key The key for which this window is evaluated.
                     * @param context The context in which the window is being evaluated.
                     * @param elements The elements in the window being evaluated.
                     * @param out A collector for emitting elements.
                     *
                     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
                     */
                    public abstract void process(
                            KEY key,
                            Context context,
                            Iterable}
                    /**
                     * The context holding window metadata.
                     */
                    public abstract class Context implements java.io.Serializable {
                        /**
                         * Returns the window that is being evaluated.
                         */
                        public abstract W window();
                        /** Returns the current processing time. */
                        public abstract long currentProcessingTime();
                        /** Returns the current event-time watermark. */
                        public abstract long currentWatermark();
                        /**
                         * State accessor for per-key and per-window state.
                         *
                         * 
                  @Override
                  public void process(String key, Context context, Iterable
                    long count = 0;
                    for (Tuple2
                      count++;
                    }
                    out.collect("Window: " + context.window() + "count: " + count);
                  }
                }
                
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        DataStreamSource
                                    String[] split = value.split(",");
                                    return new ProcessWindowFunctionDemo.ProcessInfo(
                                            Integer.parseInt(split[0])
                                            , Long.parseLong(split[1])
                                            , Integer.parseInt(split[2]));
                                })
                                .keyBy(ProcessWindowFunctionDemo.ProcessInfo::getId)
                                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                                .process(new ProcessWindowFunction
                                    @Override
                                    public void process(Integer key
                                            , ProcessWindowFunction
                                        System.out.println("调用process方法:key:" + key + "\n" + "elements:" + elements);
                                        int sum = 0;
                                        for (ProcessInfo element : elements) {
                                            sum += element.getNum();
                                        }
                                        out.collect(sum);
                                    }
                                })
                                .print();
                        env.execute();
                    }
                    @Data
                    @AllArgsConstructor
                    @NoArgsConstructor
                    public static class ProcessInfo {
                        //id
                        private int id;
                        //time
                        private long time;
                        //num
                        private int num;
                    }
                }
                
                  public SensorReading reduce(SensorReading r1, SensorReading r2) {
                      return r1.value()  r2.value() ? r2 : r1;
                  }
                }
                private static class MyProcessWindowFunction
                    extends ProcessWindowFunction
                  public void process(String key,
                                    Context context,
                                    Iterable
                      SensorReading min = minReadings.iterator().next();
                      out.collect(new Tuple2
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        DataStreamSource
                                    @Override
                                    public ReduceProcessInfo map(String value) throws Exception {
                                        String[] split = value.split(",");
                                        return new ReduceProcessInfo(
                                                Integer.parseInt(split[0])
                                                , Long.parseLong(split[1])
                                                , Integer.parseInt(split[2]));
                                    }
                                })
                                .keyBy(ReduceProcessInfo::getId)
                                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                                .reduce(new MyReduceFunction(), new MyProcessFunction())
                                .print();
                        env.execute();
                    }
                    @Data
                    @AllArgsConstructor
                    @NoArgsConstructor
                    public static class ReduceProcessInfo {
                        //id
                        private int id;
                        //time
                        private long time;
                        //num
                        private int num;
                    }
                    public static class MyProcessFunction extends ProcessWindowFunction
                        @Override
                        public void process(Integer key, ProcessWindowFunction
                            System.out.println("调用process方法:key:" + key + "\n" + "elements:" + elements);
                            ReduceProcessInfo next = elements.iterator().next();
                            out.collect(new Tuple2
                        @Override
                        public ReduceProcessInfo reduce(ReduceProcessInfo value1, ReduceProcessInfo value2) throws Exception {
                            System.out.println("调用reduce方法:" + value1 + "  " + value2);
                            return value1.num 
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]