工程的博客

本机支持会话窗口的火花结构化流

分享这篇文章

Apache火花™结构化流允许用户进行聚合windows在事件时间。在Apache 3.2火花™,火花支持windows和滑动窗口。在即将到来的Apache火花3.2中,我们添加了“会话窗口”作为新的windows支持类型,适用于流媒体和批处理查询

“会话窗口”是什么?


可视化例子Apache 3.2火花的时间窗口

暴跌windows是一系列的固定大小的重叠和连续的时间间隔。一个输入只能绑定到一个窗口。

滑动windows类似暴跌的“固定大小”,但windows可以重叠如果幻灯片的持续时间小于的持续时间窗口,在这种情况下,输入可以绑定多个窗口。

会话窗口有不同特征相比前两种类型。会话窗口有一个动态窗口长度的大小,取决于输入。开始一个会话窗口,其中有一个输入和扩展本身如果以下输入已经收到在时间的差距。一个会话窗口关闭当没有输入收到后的差距持续时间内接收最新的输入。这使您能够组事件直到没有新的事件特定时间段内的(不活跃)。

它的工作原理类似于网站上一个会话,会话超时——如果你登录一个网站,没有表现出任何活动持续时间,网站会提示您保持登录状态,迫使注销如果你还不活动超时之后已经超过了。会话超时扩展当你展示活动。

应用此会话窗口:一个新的会话窗口中启动一个新的事件时,如流媒体工作,在超时之后发生,事件将包括在相同的会话窗口。每个事件将会话超时,引入了一个不同的特点相比,其他时间窗口——会议的时间窗口不是静态的,而暴跌和滑动窗口都有一个静态时间。

如何实现一个查询使用一个会话窗口?

以前,火花需要您利用flatMapGroupsWithState处理会话窗口。你被要求工艺自己的逻辑定义会话窗口和如何总输入相同的会话。这带来了一些缺点:

  1. 你不能利用内置的聚合函数(如数量,金额等,必须做他们自己。
  2. 工艺是重要的考虑各种输出的逻辑模式和输入的迟到。
  3. 在PySpark flatMapGroupsWithState不可用;因此,你需要工艺查询通过Java / Scala。

现在,火花使用时间窗口提供了相同的用户体验。仍然是正确的,”这句话在结构化流,表达这种windows事件时间仅仅是执行一个特殊分组”。暴跌和滑动窗口,窗口的功能。会话窗口,一个新的函数session_window介绍。

例如,数量超过5分钟暴跌(重叠)窗户eventTime列后的事件可以被描述为。

#暴跌窗口windowedCountsDF=\eventsDF \.withWatermark (“eventTime”、“十分钟”)\.groupBy(“的deviceId”,窗口(“eventTime”,“十分钟”)\()#滑动窗口windowedCountsDF=\eventsDF \.withWatermark (“eventTime”、“十分钟”)\.groupBy(“的deviceId”,窗口(“eventTime”,“十分钟”,“5分钟”))\()

您可以简单地将函数“窗口”替换为“session_window”计算会话窗口以5分钟的差距在eventTime列事件。

#会话窗口windowedCountsDF=\eventsDF \.withWatermark (“eventTime”、“十分钟”)\session_window .groupBy(“的deviceId”(“eventTime”、“5分钟”))\()

会话窗口与动态差距持续时间

除了会话窗口,跨会话持续时间相同的差距,有另一种类型的会话窗口,每个会话持续时间有不同的差距。我们称之为“动态持续的差距。”

可视化例子的会话窗口与动态差距在Apache 3.2火花持续时间。

下面的框线的时间表示每个事件持续时间的差距。有四个事件和他们的(事件时间、差距持续时间)对(上午、4分钟),橙色(12:06 9分钟),黄色(吸5分钟),用绿色(12:15,5分钟)。

上面的框线表示实际的会话是由这些事件。您可以考虑每个事件作为一个个体会话,和会议有一个十字路口是合并成一个。正如你可能表明,会话的时间范围是“联盟”的时间范围包括所有事件在会话中。注意会议的结束时间不再是最新的事件的持续时间+差距在会话中。

新功能“session_window”接收两个参数,事件时间列和差距持续时间。

对于动态会话窗口,您可以提供一个“表达式”的“差距持续时间”参数“session_window”功能。表达式应该解析为一个区间,像“5分钟”。自“差距持续时间”参数接收一个表达式,您还可以利用UDF。

例如,计算在会话窗口与基于eventType列的动态间隙时间可以描述如下。

#定义会话窗口动态基于差距持续时间eventTypesession_window expr=session_window(事件。时间戳,\(events.eventType==类型1、5秒”)\(events.eventType==“type2”、“20秒”)\.otherwise(5分钟)#集团的数据通过会话窗口用户标识,计算出数量每一个集团windowedCountsDF=事件\.withWatermark(“时间戳”,“十分钟”)\.groupBy(事件。userID, session_window_expr) \()

会话窗口与FlatMapGroupsWithState的原生支持

“flatMapGroupsWithState”提供了更大的灵活性实现会话窗口,但它需要用户编写很多行代码。例如,请参考sessionization的例子在Apache通过flatMapGroupsWithState火花实现会话窗口。注意,sessionization例子在Apache火花非常简化,仅适用于处理时间& append模式对。处理事件的整体复杂性时间和各种输出模式抽象了原生支持会话的窗口。

火花原生支持会话的窗口的设置一个目标覆盖通用用例,因为它使火花来优化性能和状态存储使用。您可能还想利用flatMapGroupsWithState当你的业务用例需要复杂的会话窗口,例如,如果会话也应该结案了特定类型的事件无论静止。

结论

我们已经介绍了会话窗口流聚合,也适用于批量查询。学习如何使用新功能“session_window”,你可以利用你的知识流数据聚合的时间窗口,能够处理会话窗口。您可以利用内置的聚合函数,以及自己的UDAFs会话窗口聚合查询这也使SQL / PySpark用户处理会话窗口,在PySpark flatMapGroupsWithState API不可用,不能表示成一个SQL语句。

还有更多的房间来改进准时窗口操作,你需要使用flatMapGroupsWithState API。我们正计划研究自定义窗口业务在不久的将来。

尝试Apache 3.2火花砖10.0运行时

如果你想尝试即将到来的Apache 3.2火花砖10.0运行时,报名参加Community Edition砖或砖免费试用在几分钟内开始。使用火花3.2非常简单,只需选择版本“10.0”时启动集群。

免费试着砖

相关的帖子

看到所有工程的博客的帖子