Flink算子通用状态应用测试样例
Flink算子通用状态应用测试样例
(图片来源网络,侵删)
1. 获取Flink执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
2. 创建数据源,生成随机数据
DataStream source = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { while (true) { HashMap hashMap = new HashMap(); hashMap.put("ID", new Random().nextInt(3) + 1 + ""); hashMap.put("AMT", "1"); System.out.println("------"); System.out.println("生产数据:" + hashMap); ctx.collect(hashMap); Thread.sleep(1000); } } @Override public void cancel() {} });
3. 按照ID和星期几进行分组
KeyedStream keyedStream = source.keyBy(new KeySelector() { @Override public String getKey(Map value) throws Exception { return value.get("ID") + LocalDate.now().getDayOfWeek(); } });
4. 处理函数,实现状态初始化和元素处理逻辑
SingleOutputStreamOperator process = keyedStream.process(new KeyedProcessFunction() { private AggregatingState aggState; @Override public void open(Configuration parameters) throws Exception { // 配置状态的TTL StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 仅在创建和写入时清除,另一个读和写时清除 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不退回过期值 .build(); // 初始化状态 AggregatingStateDescriptor aggRes = new AggregatingStateDescriptor("aggRes", new AggregateFunction() { @Override public Map createAccumulator() { return new HashMap(); } @Override public Map add(Map in, Map acc) { String amt = acc.get("AMT"); if (amt == null) { acc.put("ID", in.get("ID")); acc.put("AMT", in.get("AMT")); } else { acc.put("AMT", Integer.valueOf(in.get("AMT")) + Integer.valueOf(amt) + ""); } return acc; } @Override public Map getResult(Map acc) { return acc; } @Override public Map merge(Map a, Map b) { return null; } }, TypeInformation.of(new TypeHint() { })); aggRes.enableTimeToLive(ttlConfig); aggState = getRuntimeContext().getAggregatingState(aggRes); } @Override public void processElement(Map value, KeyedProcessFunction.Context ctx, Collector out) throws Exception { aggState.add(value); out.collect(aggState.get()); } });
5. 打印聚合结果
process.map((MapFunction) value -> { System.out.println("聚合结果:" + value); return null; });
6. 执行作业
env.execute("Flink Common State Test");
7. 执行结果
------ 生产数据:{AMT=1, ID=2} 聚合结果:{AMT=1, ID=2} ------ 生产数据:{AMT=1, ID=3} 聚合结果:{AMT=1, ID=3} ------ 生产数据:{AMT=1, ID=3} 聚合结果:{AMT=2, ID=3} ------ 生产数据:{AMT=1, ID=1} 聚合结果:{AMT=1, ID=1} ------ 生产数据:{AMT=1, ID=1} 聚合结果:{AMT=2, ID=1} ------ 生产数据:{AMT=1, ID=1} 聚合结果:{AMT=3, ID=1} ...
这段代码实现了一个 Flink 作业,生成随机数据并对数据进行状态聚合处理。其中包括数据源生成、按键分区、状态初始化、元素聚合处理和结果输出。可以作为多场景下通用的实时数据处理模型。
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。