flink多流操作(connect cogroup union broadcast)

03-01 1009阅读

flink多流操作

  • 1 分流操作
  • 2 connect连接操作
    • 2.1 connect 连接(DataStream,DataStream→ConnectedStreams)
    • 2.2 coMap(ConnectedStreams → DataStream)
    • 2.3 coFlatMap(ConnectedStreams → DataStream)
    • 3 union操作
      • 3.1 union 合并(DataStream * → DataStream)
      • 4 coGroup 协同分组
        • 4.1 coGroup 实现 left join操作
        • 5 join
        • 6 broadcast 广播
          • 6.1 API 介绍 , 核心要点

            1 分流操作

            SingleOutputStreamOperator mainStream = students.process(new ProcessFunction() {
                @Override
                public void processElement(Student student, ProcessFunction.Context ctx, Collector collector) throws Exception {
                    if (student.getGender().equals("m")) {
                        // 输出到测流
                        ctx.output(maleOutputTag, student);
                    } else if (student.getGender().equals("f")) {
                        // 输出到测流
                        ctx.output(femaleOutputTag, student.toString());
                    } else {
                        // 在主流中输出
                        collector.collect(student);
                    }
                }
            });
            SingleOutputStreamOperator side1 = mainStream.getSideOutput(maleOutputTag);
            SingleOutputStreamOperator side2 = mainStream.getSideOutput(femaleOutputTag);
            

            2 connect连接操作

            2.1 connect 连接(DataStream,DataStream→ConnectedStreams)

            connect 翻译成中文意为连接,可以将两个数据类型一样也可以类型不一样 DataStream 连接成一个新 的 ConnectedStreams。需要注意的是,connect 方法与 union 方法不同,虽然调用 connect 方法将两个 流连接成一个新的 ConnectedStreams,但是里面的两个流依然是相互独立的,这个方法最大的好处是 可以让两个流共享 State 状态。

            // 使用 fromElements 创建两个 DataStream
            DataStreamSource word = env.fromElements("a", "b", "c", "d");
            DataStreamSource num = env.fromElements(1, 3, 5, 7, 9);
            // 将两个 DataStream 连接到一起
            ConnectedStreams connected = word.connect(num);
            

            2.2 coMap(ConnectedStreams → DataStream)

            对 ConnectedStreams 调用 map 方法时需要传入 CoMapFunction 函数;

            该接口需要指定 3 个泛型:

            1. 第一个输入 DataStream 的数据类型
            2. 第二个输入 DataStream 的数据类型
            3. 返回结果的数据类型。

              该接口需要重写两个方法:

            4. map1 方法,是对第 1 个流进行 map 的处理逻辑。
            5. 2 map2 方法,是对 2 个流进行 map 的处理逻辑

            这两个方法必须是相同的返回值类型。

            //将两个 DataStream 连接到一起
            ConnectedStreams wordAndNum = word.connect(num);
            // 对 ConnectedStreams 中两个流分别调用个不同逻辑的 map 方法
            DataStream result = wordAndNum.map(new CoMapFunction() {
                @Override
                public String map1(String value) throws Exception {
                    // 第一个 map 方法是将第一个流的字符变大写
                    return value.toUpperCase();
                }
                @Override
                public String map2(Integer value) throws Exception {
                    // 第二个 map 方法将是第二个流的数字乘以 10 并转成 String
                    return String.valueOf(value * 10);
                }
            });
            

            2.3 coFlatMap(ConnectedStreams → DataStream)

            对 ConnectedStreams 调用 flatMap 方法。调用 flatMap 方法,传入的 Function 是 CoFlatMapFunction;

            这个接口要重写两个方法:

            1. flatMap1 方法,是对第 1 个流进行 flatMap 的处理逻辑;
            2. flatMap2 方法,是对 2 个流进行 flatMap 的处理逻辑;

            这两个方法都必须返回是相同的类型。

            // 使用 fromElements 创建两个 DataStream
            DataStreamSource word = env.fromElements("a b c", "d e f");
            DataStreamSource num = env.fromElements("1,2,3", "4,5,6");
            // 将两个 DataStream 连接到一起
            ConnectedStreams connected = word.connect(num);
            // 对 ConnectedStreams 中两个流分别调用个不同逻辑的 flatMap 方法
            DataStream result = connected.flatMap(new CoFlatMapFunction() {
                @Override
                public void flatMap1(String value, Collector out) throws Exception {
                    String[] words = value.split(" ");
                    for (String w : words) {
                        out.collect(w);
                    }
                }
                @Override
                public void flatMap2(String value, Collector out) throws Exception {
                    String[] nums = value.split(",");
                    for (String n : nums) {
                        out.collect(n);
                    }
                }
            });
            

            3 union操作

            3.1 union 合并(DataStream * → DataStream)

            该方法可以将两个或者多个数据类型一致的 DataStream 合并成一个 DataStream。DataStream union(DataStream… streams)可以看出 DataStream 的 union 方法的参数为可变参数,即可以合并两 个或多个数据类型一致的 DataStream,connect 不要求两个流的类型一致,但union必须一致。

            下面的例子是使用 fromElements 生成两个 DataStream,一个是基数的,一个是偶数的,然后将两个 DataStream 合并成一个 DataStream。

            // 使用 fromElements 创建两个 DataStream
            DataStreamSource odd = env.fromElements(1, 3, 5, 7, 9);
            DataStreamSource even = env.fromElements(2, 4, 6, 8, 10);
            // 将两个 DataStream 合并到一起
            DataStream result = odd.union(even);
            

            4 coGroup 协同分组

            coGroup 本质上是join 算子的底层算子;功能类似;可以用cogroup来实现join left join full join的功能。 代码结构如下:

            DataStreamSource stream1 = env.fromElements("1,aa,m,18", "2,bb,m,28", "3,cc,f,38");
            DataStreamSource stream2 = env.fromElements("1:aa:m:18", "2:bb:m:28", "3:cc:f:38");
            DataStream res = stream1
                .coGroup(stream2)
                .where(new KeySelector() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .equalTo(new KeySelector() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction() {
                    @Override
                    public void coGroup(Iterable first, Iterable second, Collector out) throws Exception {
                        // 这里添加具体的 coGroup 处理逻辑
                       // 这两个迭代器,是这5s的数据中的某一组,id = 1
                    }
                });
            

            4.1 coGroup 实现 left join操作

            package batch;
            import org.apache.flink.api.common.functions.CoGroupFunction;
            import org.apache.flink.api.common.functions.FlatMapFunction;
            import org.apache.flink.api.common.typeinfo.TypeHint;
            import org.apache.flink.api.java.ExecutionEnvironment;
            import org.apache.flink.api.java.operators.DataSource;
            import org.apache.flink.api.java.tuple.Tuple2;
            import org.apache.flink.api.java.tuple.Tuple3;
            import org.apache.flink.streaming.api.datastream.DataStream;
            import org.apache.flink.streaming.api.datastream.DataStreamSource;
            import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
            import org.apache.flink.streaming.api.windowing.time.Time;
            import org.apache.flink.util.Collector;
            public class coGrouptest {
                public static void main(String[] args) throws Exception {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //        id name
                    DataStreamSource stream1 = env.socketTextStream("localhost", 9998);
            //        id age
                    DataStreamSource stream2 = env.socketTextStream("localhost", 9999);
            // nc -lp 9999
            // nc -lp 9998
                    SingleOutputStreamOperator s1 = stream1.map(s -> {
                        String[] arr = s.split(",");
                        return Tuple2.of(arr[0], arr[1]);
                    }).returns(new TypeHint() {
                    });
                    SingleOutputStreamOperator s2 = stream2.map(s -> {
                        String[] arr = s.split(",");
                        return Tuple2.of(arr[0], arr[1]);
                    }).returns(new TypeHint() {
                    });
                    DataStream out = s1.coGroup(s2).where(tp -> tp.f0)  //左的f0 id 字段
                            .equalTo(tp -> tp.f0)  //又的f0 id 字段
                            .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
                            .apply(new CoGroupFunction() {
                                @Override
                                public void coGroup(Iterable iterable, Iterable iterable1, Collector out) throws Exception {
                                    for (Tuple2 t1 : iterable) {
                                        boolean t2isnull = false;
                                        for (Tuple2 t2 : iterable1) {
                                            out.collect(new Tuple3(t1.f0,t1.f1,t2.f1));
                                            t2isnull = true;
                                        }
                                        if(!t2isnull){
                                            out.collect(new Tuple3(t1.f0,t1.f1,null));
                                        }
                                    }
                                }
                            });
                    out.print();
                    env.execute();
            }
            }
            

            5 join

            用于关联两个流(类似于 sql 中 join),需要指定 join,需要在窗口中进行关联后的逻辑计算。

            只能支持inner join 不支持 左右和全连接

            stream.join(otherStream)
                  .where()
                  .equalTo()
                  .window()
                  .apply();
            

            实例:

            SingleOutputStreamOperator s1;
            SingleOutputStreamOperator s2;
            // join 两个流,此时并没有具体的计算逻辑
            JoinedStreams joined = s1.join(s2);
            // 对 join 流进行计算处理
            DataStream stream = joined
                    // where 流 1 的某字段 equalTo 流 2 的某字段
                    .where(s -> s.getId()).equalTo(s -> s.getId())
                    // join 实质上只能在窗口中进行
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                    // 对窗口中满足关联条件的数据进行计算
                    .apply(new JoinFunction() {
                        // 这边传入的两个流的两条数据,是能够满足关联条件的
                        @Override
                        public String join(Student first, StuInfo second) throws Exception {
                            // first: 左流数据 ; second: 右流数据
                            // 计算逻辑
                            // 返回结果
                            return null;
                        }
                    });
            // 对 join 流进行计算处理
            joined.where(s -> s.getId()).equalTo(s -> s.getId())
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                    .apply(new FlatJoinFunction() {
                        @Override
                        public void join(Student first, StuInfo second, Collector out) throws Exception {
                            out.collect();
                        }
                    });
            

            6 broadcast 广播

            Broadcast State 是 Flink 1.5 引入的新特性。 在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到 另一个数据流的计算中 。

            flink多流操作(connect cogroup union broadcast)

            6.1 API 介绍 , 核心要点

            • 将需要广播出去的流,调用 broadcast 方法进行广播转换,得到广播流 BroadCastStream
            • 然后在主流上调用 connect 算子,来连接广播流(以实现广播状态的共享处理)
            • 在连接流上调用 process 算子,就会在同一个 ProcessFunciton 中提供两个方法分别对两个流进行 处理,并在这个 ProcessFunction 内实现“广播状态”的共享
              public class _16_BroadCast_Demo {
                  public static void main(String[] args) throws Exception {
                      Configuration configuration = new Configuration();
                      configuration.setInteger("rest.port", 8822);
                      
                      StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
                      env.setParallelism(1);
                      
                      // id,eventId
                      DataStreamSource stream1 = env.socketTextStream("localhost", 9998);
                      SingleOutputStreamOperator s1 = stream1.map(s -> {
                          String[] arr = s.split(",");
                          return Tuple2.of(arr[0], arr[1]);
                      }).returns(new TypeHint() { });
                      
                      // id,age,city
                      DataStreamSource stream2 = env.socketTextStream("localhost", 9999);
                      SingleOutputStreamOperator s2 = stream2.map(s -> {
                          String[] arr = s.split(",");
                          return Tuple3.of(arr[0], arr[1], arr[2]);
                      }).returns(new TypeHint() { });
                      
                      /**
                       * 案例背景:
                       * 流 1: 用户行为事件流(持续不断,同一个人也会反复出现,出现次数不定
                       * 流 2: 用户维度信息(年龄,城市),同一个人的数据只会来一次,来的时间也不定 (作为广播流)
                       * 需要加工流 1,把用户的维度信息填充好,利用广播流来实现
                       */
                      
                      // 将字典数据所在流: s2 , 转成 广播流
                      MapStateDescriptor userInfoStateDesc =
                              new MapStateDescriptor("userInfoStateDesc", TypeInformation.of(String.class),
                                      TypeInformation.of(new TypeHint() {}));
                      BroadcastStream s2BroadcastStream = s2.broadcast(userInfoStateDesc);
                      
                      // 哪个流处理中需要用到广播状态数据,就要 去 连接 connect 这个广播流
                      SingleOutputStreamOperator connected = s1.connect(s2BroadcastStream)
                              .process(new BroadcastProcessFunction() {
                                  /**BroadcastState broadcastState;*/
                                  
                                  /**
                                   * 本方法,是用来处理 主流中的数据(每来一条,调用一次)
                                   * @param element 左流(主流)中的一条数据
                                   * @param ctx 上下文
                                   * @param out 输出器
                                   * @throws Exception
                                   */
                                  @Override
                                  public void processElement(Tuple2 element,
              BroadcastProcessFunction.ReadOnlyContext ctx,
              Collector out) throws Exception {
                                      // 通过 ReadOnlyContext ctx 取到的广播状态对象,是一个 “只读 ” 的对象;
                                      ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(userInfoStateDesc);
                                      
                                      if (broadcastState != null) {
                                          Tuple2 userInfo = broadcastState.get(element.f0);
              						out.collect(element.f0 + "," + element.f1 + "," + (userInfo == null ? null : userInfo.f0) + "," + (userInfo == null ? null : userInfo.f1));
              						} else { out.collect(element.f0 + "," + element.f1 + "," + null + "," + null);
              						 }
              				 }
              				 /**** 
              				 * @param element 广播流中的一条数据 
              				 * @param ctx 上下文 
              				 * @param out 输出器 
              				 * @throws Exception 
              				 */ 
              				@Override 
              				public void processBroadcastElement(Tuple3 element, 
              				                                    BroadcastProcessFunction.Context ctx, 
              				                                    Collector out) throws Exception { 
              				    // 从上下文中,获取广播状态对象(可读可写的状态对象) 
              				    BroadcastState broadcastState = ctx.getBroadcastState(userInfoStateDesc); 
              				    // 然后将获得的这条广播流数据,拆分后,装入广播状态 
              				    broadcastState.put(element.f0, Tuple2.of(element.f1, element.f2)); 
              				}
              				
              				resultStream.print(); 
              				env.execute(); 
              				}
              				}
              
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]