跳到主要内容
bob体育客户端下载平台的博客

功能深入:在Apache Spark结构化流水印

2022年8月22日 产品

分享这篇文章

关键的外卖

  • 水印帮助Spark了解基于事件时间的处理进度,何时生成带窗口的聚合以及何时修剪聚合状态
  • 当连接数据流时,默认情况下,Spark使用单个全局水印,根据在输入流中看到的最小事件时间来驱逐状态
  • 可以利用RocksDB来减少对集群内存和GC暂停的压力
  • StreamingQueryProgressStateOperatorProgress对象包含有关水印如何影响流的关键信息

介绍

在构建实时管道时,团队必须处理的现实之一是分布式数据摄取本质上是无序的。另外,在有状态的上下文中流媒体操作方面,团队需要能够正确地跟踪数据流中的事件时间进度,以便正确计算时间窗口聚合和其他有状态操作。我们可以使用结构化流解决所有这些问题。

例如,假设我们是一个团队,致力于建立一个管道,以帮助我们的公司对我们租赁给客户的矿机进行主动维护。这些机器总是需要在最佳状态下运行,所以我们对它们进行实时监控。我们需要对流数据执行有状态聚合,以理解和识别机器中的问题。

这就是我们需要利用结构化流和水印来生成必要的有状态聚合的地方,这将有助于为这些机器的预测性维护和更多决策提供信息。

什么是水印?

一般来说,在处理实时流数据时,由于数据的摄取方式以及整个应用程序是否遇到停机等问题,事件时间和处理时间之间会有延迟。由于这些潜在的可变延迟,用于处理此数据的引擎需要具有某种机制来决定何时关闭聚合窗口并生成聚合结果。

虽然解决这些问题的自然倾向可能是使用基于挂钟时间的固定延迟,但我们将在接下来的示例中展示为什么这不是最佳解决方案。

为了直观地解释这一点,让我们以一个场景为例,我们在上午10:50→11:20左右的不同时间接收数据。我们正在创建一个10分钟的滚动窗口,用来计算窗口期间的平均温度和压力读数。

在第一张图中,我们在上午11点、11点10分和11点20分触发了滚动窗口,导致了各自时间的结果表。当第二批数据在上午11:10左右出现时,数据的事件时间为10:53 AM,这被合并到11:00 AM→11:10 AM窗口的温度和压力平均值中,该窗口在11:10 AM关闭,这不会给出正确的结果。

一个结构化流管道的可视化表示,它摄取成批的温度和压力数据

为了确保得到想要生成的聚合的正确结果,我们需要定义a水印这将允许Spark了解何时关闭聚合窗口并产生正确的聚合结果。

在结构化流应用程序中,我们可以确保我们想要计算的聚合的所有相关数据都通过使用称为水印。在最基本的意义上,通过定义a水印然后,Spark结构化流知道它何时已经摄取了所有数据,直到一段时间,T,(基于设置的延迟期望),以便它可以关闭并产生时间戳范围内的窗口聚合T

第二个视觉效果显示了在Spark结构化流中实现10分钟的水印和使用追加模式的效果。

当应用于结构化流管道时,10分钟水印效果的可视化表示。

与第一个场景不同的是,Spark每隔十分钟就会发出前十分钟的窗口聚合(即在11:10 AM发出11:00 AM→11:10 AM窗口),现在Spark等待关闭并输出一次窗口聚合看到的最大事件时间减去指定的水印大于窗口的上界。

换句话说,Spark需要等到它看到最近的事件时间减去10分钟大于11:00 AM的数据点时,才会发出10:50 AM→11:00 AM聚合窗口。在11:00 AM,它没有看到这个,所以它只在Spark的内部状态存储中初始化聚合计算。在11:10 AM,这个条件仍然没有满足,但是我们有一个10:53 AM的新数据点,所以内部状态得到更新,只是没有发出。最后到11:20 AM, Spark看到了一个事件时间为11:15 AM的数据点,因为11:15 AM减去10分钟是11:05 AM,晚于11:00 AM, 10:50 AM→11:00 AM窗口可以发射到结果表中。

根据水印定义的预期延迟,适当地合并数据,从而产生正确的结果。发出结果后,将从状态存储中删除相应的状态。

将水印纳入您的管道

为了理解如何将这些水印合并到结构化流管道中,我们将通过一个实际的代码示例来探索这个场景,该示例基于本博客介绍部分中所述的用例。

假设我们从云端的Kafka集群中摄取所有传感器数据,我们希望每十分钟计算一次温度和压力平均值,预期时间偏差为十分钟。带有水印的结构化流管道看起来像这样:

PySpark

sensorStreamDF=火花\.readStream \.format \(“卡夫卡”).option(“kafka.bootstrap。Servers ", "host1:port1,host2:port2") \.option("subscribe", " tempandpressurerereads ") \.load ()sensorStreamDF=sensorStreamDF \.withWatermark("eventTimestamp", "10分钟")\.groupBy (窗口(sensorStreamDF。eventTimestamp, "10分钟"))\avg(sensorStreamDF.temperaturesensorStreamDF.pressure)
              sensorStreamDF.writeStream.format(“δ”).outputMode(“追加”).option(“checkpointLocation”、“/δ/事件/ _checkpoints / temp_pressure_job /”)开始(“/δ/ temperatureAndPressureAverages”)

这里我们简单地从Kafka读取数据,应用我们的转换和聚合,然后写入Delta Lake表,这些表将在Databricks SQL中被可视化和监控。为特定数据样本写入表的输出看起来像这样:

上面PySpark代码示例中定义的流查询的输出

要加入水印,我们首先需要识别两个项目:

  1. 表示传感器读取事件时间的列
  2. 估计数据的预期时间偏差

从前面的例子中,我们可以看到由.withWatermark ()方法,使用eventTimestamp列作为事件时间列,并使用10分钟来表示我们期望的时间偏差。

PySpark

sensorStreamDF=sensorStreamDF \.withWatermark("eventTimestamp", "10分钟")\.groupBy (窗口(sensorStreamDF。eventTimestamp, "10分钟"))\avg(sensorStreamDF.temperaturesensorStreamDF.pressure)

既然我们知道了如何在结构化流管道中实现水印,那么理解其他项目(如流连接操作和管理状态)如何受到水印的影响将是很重要的。此外,当我们扩展管道时,我们的数据工程师将需要了解和监控一些关键指标,以避免性能问题。我们将探索所有这一切,因为我们深入到水印。

水印在不同的输出模式

在我们深入研究之前,重要的是要了解您选择的输出模式如何影响您设置的水印的行为。

水印只能在运行流媒体应用程序时使用附加更新输出模式。还有第三种输出模式,即完整模式,在这种模式下,整个结果表被写入存储器。此模式不能使用,因为它要求保留所有聚合数据,因此不能使用水印来删除中间状态。

在窗口聚合和水印上下文中,这些输出模式的含义是,在“追加”模式下,聚合只能产生一次,并且不能更新。因此,一旦产生聚合,引擎就可以删除聚合的状态,从而使整个聚合状态保持有界。后期记录——那些近似水印启发式不适用的记录(它们比水印延迟期更早),因此必须被删除——聚合已经产生,聚合状态被删除。

相反,对于“更新”模式,聚合可以从第一个记录开始,并在每个接收到的记录上重复产生,因此水印是可选的。水印仅在启发式地当引擎知道不再可以接收聚合的记录时才用于修剪状态。一旦状态被删除,任何迟来的记录都必须被删除,因为聚合值已经丢失并且无法更新。

了解状态、延迟到达的记录和不同的输出模式如何导致在Spark上运行的应用程序的不同行为是很重要的。这里的主要收获是,在附加和更新模式中,一旦水印表明在聚合时间窗口中接收到所有数据,引擎就可以修剪窗口状态。在附加模式下,聚合仅在时间窗口加上水印延迟关闭时产生,而在更新模式下,它在每次更新窗口时产生。

最后,通过增加水印延迟窗口,您将导致管道等待数据的时间更长,并且可能会丢失更少的数据-更高的精度,但也会增加产生聚合的延迟。另一方面,较小的水印延迟会导致较低的精度,但也会降低产生聚合的延迟。

窗口延迟长度 精度 延迟
更长的延迟窗口 更高的精度 更高的延迟
更短的延迟窗口 低精度 更低的延迟

更深入地了解水印

连接和水印

在流应用程序中执行连接操作时,特别是在连接两个流时,需要注意一些事项。假设对于我们的用例,我们希望将关于温度和压力读数的流数据集与机器上其他传感器捕获的附加值连接起来。

结构化流中可以实现三种主要类型的流-流连接:内部连接、外部连接和半连接。在流应用程序中进行连接的主要问题是,您可能对连接的一侧有一个不完整的了解。让Spark了解什么时候没有预期的未来匹配,这与之前的聚合问题类似,在聚合计算之前,Spark需要了解什么时候没有新行可以合并到聚合计算中。

为了允许Spark处理这个问题,我们可以在流-流连接的连接条件中利用水印和事件时间约束的组合。这种组合允许Spark过滤掉延迟的记录,并通过连接上的时间范围条件修剪连接操作的状态。我们在下面的例子中进行演示:

PySpark

sensorStreamDF=spark.readStream.format(“δ”)。表格(“sensorData”)tempAndPressStreamDF=spark.readStream.format(“δ”)。表格(“tempPressData”)sensorStreamDF_wtmrk=sensorStreamDF。withWatermark("timestamp", "5 minutes")tempAndPressStreamDF_wtmrk=tempAndPressStreamDF。withWatermark("timestamp", "5 minutes")joinedDF=tempAndPressStreamDF_wtmrk.alias(“t”)。加入sensorStreamDF_wtmrk.alias(“s”),expr(“””s.sensor_id == t.sensor_id AND.timestamp >= .timestamp ANDS.timestamp <= t.timestamp + interval 5分钟”“”),joinType=“内心”).withColumn(“sensorMeasure坳(“Sensor1”)+坳(“Sensor2”))\.groupBy (窗口(col("t.timestamp"), "10分钟"))\.agg (avg(坳(“sensorMeasure”)).alias(“avg_sensor_measure”),avg(坳(“温度”)).alias(“avg_temperature”),avg(坳(“压力”)).alias \(“avg_pressure”))选择(“window”,“avg_sensor_measure”,“avg_temperature”,“avg_pressure”)
              joinedDF.writeStream.format \(“δ”).outputMode \(“追加”).option("checkpointLocation", "/checkpoint/files/") \.toTable(“output_table”)

然而,与上述示例不同的是,有时每个流可能需要不同的时间偏差来显示其水印。在这个场景中,Spark有一个处理多个水印定义的策略。火花维护一个全局水印这是基于最慢的流,以确保在不丢失数据时最高的安全性。

开发人员确实有能力通过更改来改变这种行为spark.sql.streaming.multipleWatermarkPolicy马克思;然而,这意味着来自较慢流的数据将被丢弃。

要查看需要或可能利用水印的所有连接操作,请查看本节Spark的文档。

监控和管理流与水印

当管理流查询时,Spark可能需要管理数百万个键并保存每个键的状态,Databricks集群附带的默认状态存储可能不太有效。您可能会看到更高的内存利用率,然后出现更长的垃圾收集暂停时间。这些都会阻碍结构化流应用程序的性能和可伸缩性。

这就是RocksDB发挥作用的地方。通过在Spark配置中启用RocksDB,你可以在Databricks中本地利用它:

spark.conf。“spark.sql.streaming.stateStore.providerClass”“com.databricks.sql.streaming.state.RocksDBStateStoreProvider”

这将允许运行结构化流应用程序的集群利用RocksDB,它可以更有效地管理本地内存中的状态,并利用本地磁盘/SSD,而不是将所有状态保存在内存中。

除了跟踪内存使用和垃圾收集指标之外,在处理水印和结构化流时还应该收集和跟踪其他关键指标和指标。要访问这些指标,您可以查看StreamingQueryProgressStateOperatorProgress对象。查看我们的文档,了解如何使用这些功能在这里

在StreamingQueryProgress对象中,有一个名为“eventTime”的方法可以被调用,它将返回马克斯最小值avg,水印时间戳。前三个是看到的最大、最小和平均事件时间在那个触发器里。最后一个触发器中使用的水印。

StreamingQueryProgress对象的简短示例

{“id”“f4311acb - 15 - da - 4 - dc3 - 80 - b2 - acae4a0b6c11”. . . .“eventTime”: {“平均”“2021 - 02 - 14 - t10:56:06.000z”“马克斯”“2021 - 02 - 14 - t11:01:06.000z”“最小值”“2021 - 02 - 14 - t10:51:06.000z”“水印”“2021 - 02 - 14 - t10:41:06.000z”},“stateOperators”: [{“operatorName”“stateStoreSave”“numRowsTotal”7“numRowsUpdated”0“allUpdatesTimeMs”205“numRowsRemoved”0“allRemovalsTimeMs”233“commitTimeMs”15182“memoryUsedBytes”91504“numRowsDroppedByWatermark”0“numShufflePartitions”200“numStateStoreInstances”200“customMetrics”: {“loadedMapCacheHitCount”4800“loadedMapCacheMissCount”0“stateOnCurrentVersionSizeBytes”25680}}. . . .}

这些信息片段可用于协调流查询输出的结果表中的数据,也可用于验证所使用的水印是否为预期的eventTime时间戳。当您将数据流连接在一起时,这一点非常重要。

在StateOperatorProgress对象中有numRowsDroppedByWatermark指标。该指标将显示有多少行被认为太晚而无法包含在有状态聚合中。注意,这个指标测量的是丢失的行数事后的而不是原始输入行,因此该数字不精确,但可以指示有延迟的数据被丢弃。这与来自StreamingQueryProgress对象的信息相结合,可以帮助开发人员确定水印是否正确配置。

多聚合、流和水印

结构化流查询的一个限制是在单个流查询中链接多个有状态操作符(例如聚合、流连接)。对于有状态聚合,单一全局水印的限制是我们Databricks正在研究的解决方案,并将在未来几个月发布更多信息。查看我们的博客项目光速了解更多信息:BOB低频彩项目Lightspeed:使用Apache Spark更快更简单的流处理(www.neidfyre.com)

结论

使用Databricks上的结构化流和水印,像上面描述的用例一样的组织可以构建有弹性的实时应用程序,以确保即使数据没有正确排序或准时,也能准确计算由实时聚合驱动的指标。要了解BOB低频彩有关如何使用Databricks构建实时应用程序的更多信息,请联系Databricks代表。

免费试用Databricks
看到所有产品的帖子