如何启动一个结构化流查询从去年写抵消吗

学习如何重启一个结构化流查询上次写的偏移量。

写的亚当Pavlacka

去年发表在:2022年5月18日

场景

你有流,运行窗口的聚合查询,读来自Apache卡夫卡和写文件附加模式。你想升级的应用程序并重新启动查询抵消等于最后写抵消。你想丢弃所有状态信息还没有被写入下沉,开始处理从最早的偏移导致废弃状态,并相应地修改目录检查站。

然而,如果您使用现有的检查点升级应用程序代码后,旧州和对象重用以前的应用程序版本,导致意想不到的输出如阅读旧来源或处理旧的应用程序代码。

解决方案

Apache火花保持状态在执行检查点和二进制对象。因此您不能修改检查点目录。作为一种替代方法,复制和更新偏移量与输入记录并存储在文件或数据库中。读它在下一次重新启动的初始化和使用相同的值readStream。确保删除检查点目录。

你可以通过使用异步api:当前偏移量

% scala spark.streams。addListener(新StreamingQueryListener(){覆盖def onQueryStarted (queryStarted: QueryStartedEvent):单位= {println(“查询开始:”+ queryStarted.id)}覆盖def onQueryTerminated (queryTerminated: QueryTerminatedEvent):单位= {println(“查询终止”+ queryTerminated.id)}覆盖def onQueryProgress (queryProgress: QueryProgressEvent):单位= {println(“查询取得进展”)println(“开始抵消:”+ queryProgress.progress.sources (0) .startOffset) println(“结束偏移量:”+ queryProgress.progress.sources (0) .endOffset) / /逻辑来保存这些补偿}})

您可以使用readStream所写的最新抵消过程如上所示:

% scala选项(startingOffsets“”、“”{“articleA”:{“0”: 23日,“1”:1},“articleB”: {“0”: 2}} " " ")

流媒体记录的输入模式:

根|——关键:二进制(nullable = true) |——价值:二进制(nullable = true) |——文章:字符串(nullable = true) |——分区:整数(nullable = true) |——抵消:长(可空= true) |——时间戳:时间戳(nullable = true) |——timestampType:整数(nullable = true)

同样,你可以实现逻辑保存并更新抵消数据库和读它在下次重新启动。

这篇文章有用吗?