This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
事件驱动应用 #
处理函数(Process Functions) #
简介 #
ProcessFunction 将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction 十分相似, 但是增加了 Timer。
示例 #
如果你已经体验了 流式分析训练 的动手实践, 你应该记得,它是采用 TumblingEventTimeWindow 来计算每个小时内每个司机的小费总和, 像下面的示例这样:
// 计算每个司机每小时的小费总和 DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares .keyBy((TaxiFare fare) -> fare.driverId) .window(TumblingEventTimeWindows.of(Duration.ofHours(1))) .process(new AddTips()); 使用 KeyedProcessFunction 去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码:
// 计算每个司机每小时的小费总和 DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares .keyBy((TaxiFare fare) -> fare.driverId) .process(new PseudoWindow(Duration.ofHours(1))); 在这个代码片段中,一个名为 PseudoWindow 的 KeyedProcessFunction 被应用于 KeyedStream, 其结果是一个 DataStream<Tuple3<Long, Long, Float>> (与使用 Flink 内置时间窗口的实现生成的流相同)。
PseudoWindow 的总体轮廓示意如下:
// 在时长跨度为一小时的窗口中计算每个司机的小费总和。 // 司机ID作为 key。 public static class PseudoWindow extends KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { private final long durationMsec; public PseudoWindow(Time duration) { this.durationMsec = duration.toMilliseconds(); } @Override // 在初始化期间调用一次。 public void open(OpenContext ctx) { . . . } @Override // 每个票价事件(TaxiFare-Event)输入(到达)时调用,以处理输入的票价事件。 public void processElement( TaxiFare fare, Context ctx, Collector<Tuple3<Long, Long, Float>> out) throws Exception { . . . } @Override // 当当前水印(watermark)表明窗口现在需要完成的时候调用。 public void onTimer(long timestamp, OnTimerContext context, Collector<Tuple3<Long, Long, Float>> out) throws Exception { . . . } } 注意事项:
-
有几种类型的 ProcessFunctions – 不仅包括
KeyedProcessFunction,还包括CoProcessFunctions、BroadcastProcessFunctions等. -
KeyedProcessFunction是一种RichFunction。作为RichFunction,它可以访问使用 Managed Keyed State 所需的open和getRuntimeContext方法。 -
有两个回调方法须要实现:
processElement和onTimer。每个输入事件都会调用processElement方法; 当计时器触发时调用onTimer。它们可以是基于事件时间(event time)的 timer,也可以是基于处理时间(processing time)的 timer。 除此之外,processElement和onTimer都提供了一个上下文对象,该对象可用于与TimerService交互。 这两个回调还传递了一个可用于发出结果的Collector。
open() 方法 #
// 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。 // 每个司机都有一个单独的MapState对象。 private transient MapState<Long, Float> sumOfTips; @Override public void open(OpenContext ctx) { MapStateDescriptor<Long, Float> sumDesc = new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); sumOfTips = getRuntimeContext().getMapState(sumDesc); } 由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。 这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。 实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。 此实现通过使用 MapState 来支持处理这一点,该 MapState 将每个窗口的结束时间戳映射到该窗口的小费总和。
processElement() 方法 #
public void processElement( TaxiFare fare, Context ctx, Collector<Tuple3<Long, Long, Float>> out) throws Exception { long eventTime = fare.getEventTime(); TimerService timerService = ctx.timerService(); if (eventTime <= timerService.currentWatermark()) { // 事件延迟;其对应的窗口已经触发。 } else { // 将 eventTime 向上取值并将结果赋值到包含当前事件的窗口的末尾时间点。 long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); // 在窗口完成时将启用回调 timerService.registerEventTimeTimer(endOfWindow); // 将此票价的小费添加到该窗口的总计中。 Float sum = sumOfTips.get(endOfWindow); if (sum == null) { sum = 0.0F; } sum += fare.tip; sumOfTips.put(endOfWindow, sum); } } 需要考虑的事项:
-
延迟的事件怎么处理?watermark 后面的事件(即延迟的)正在被删除。 如果你想做一些比这更高级的操作,可以考虑使用旁路输出(Side outputs),这将在下一节中解释。
-
本例使用一个
MapState,其中 keys 是时间戳(timestamp),并为同一时间戳设置一个 Timer。 这是一种常见的模式;它使得在 Timer 触发时查找相关信息变得简单高效。
onTimer() 方法 #
public void onTimer( long timestamp, OnTimerContext context, Collector<Tuple3<Long, Long, Float>> out) throws Exception { long driverId = context.getCurrentKey(); // 查找刚结束的一小时结果。 Float sumOfTips = this.sumOfTips.get(timestamp); Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips); out.collect(result); this.sumOfTips.remove(timestamp); } 注意:
-
传递给
onTimer的OnTimerContext context可用于确定当前 key。 -
我们的 pseudo-windows 在当前 Watermark 到达每小时结束时触发,此时调用
onTimer。 这个onTimer方法从sumOfTips中删除相关的条目,这样做的效果是不可能容纳延迟的事件。 这相当于在使用 Flink 的时间窗口时将 allowedLateness 设置为零。
性能考虑 #
Flink 提供了为 RocksDB 优化的 MapState 和 ListState 类型。 相对于 ValueState,更建议使用 MapState 和 ListState,因为使用 RocksDBStateBackend 的情况下, MapState 和 ListState 比 ValueState 性能更好。 RocksDBStateBackend 可以附加到 ListState,而无需进行(反)序列化, 对于 MapState,每个 key/value 都是一个单独的 RocksDB 对象,因此可以有效地访问和更新 MapState。
旁路输出(Side Outputs) #
简介 #
有几个很好的理由希望从 Flink 算子获得多个输出流,如下报告条目:
- 异常情况(exceptions)
- 格式错误的事件(malformed events)
- 延迟的事件(late events)
- operator 告警(operational alerts),如与外部服务的连接超时
旁路输出(Side outputs)是一种方便的方法。除了错误报告之外,旁路输出也是实现流的 n 路分割的好方法。
示例 #
现在你可以对上一节中忽略的延迟事件执行某些操作。
Side output channel 与 OutputTag<T> 相关联。这些标记拥有自己的名称,并与对应 DataStream 类型一致。
private static final OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; 上面显示的是一个静态 OutputTag<TaxiFare> ,当在 PseudoWindow 的 processElement 方法中发出延迟事件时,可以引用它:
if (eventTime <= timerService.currentWatermark()) { // 事件延迟,其对应的窗口已经触发。 ctx.output(lateFares, fare); } else { . . . } 以及当在作业的 main 中从该旁路输出访问流时:
// 计算每个司机每小时的小费总和 SingleOutputStreamOperator hourlyTips = fares .keyBy((TaxiFare fare) -> fare.driverId) .process(new PseudoWindow(Duration.ofHours(1))); hourlyTips.getSideOutput(lateFares).print(); 或者,可以使用两个同名的 OutputTag 来引用同一个旁路输出,但如果这样做,它们必须具有相同的类型。
结语 #
在本例中,你已经了解了如何使用 ProcessFunction 重新实现一个简单的时间窗口。 当然,如果 Flink 内置的窗口 API 能够满足你的开发需求,那么一定要优先使用它。 但如果你发现自己在考虑用 Flink 的窗口做些错综复杂的事情,不要害怕自己动手。
此外,ProcessFunctions 对于计算分析之外的许多其他用例也很有用。 下面的实践练习提供了一个完全不同的例子。
ProcessFunctions 的另一个常见用例是清理过时 State。如果你回想一下 Rides and Fares Exercise , 其中使用 RichCoFlatMapFunction 来计算简单 Join,那么示例方案假设 TaxiRides 和 TaxiFares 两个事件是严格匹配为一个有效 数据对(必须同时出现)并且每一组这样的有效数据对都和一个唯一的 rideId 严格对应。如果数据对中的某个 TaxiRides 事件(TaxiFares 事件) 丢失,则同一 rideId 对应的另一个出现的 TaxiFares 事件(TaxiRides 事件)对应的 State 则永远不会被清理掉。 所以这里可以使用 KeyedCoProcessFunction 的实现代替它(RichCoFlatMapFunction),并且可以使用计时器来检测和清除任何过时 的 State。
实践练习 #
本节的实践练习是 Long Ride Alerts Exercise .