优化与.trigger流交易

使用.trigger定义存储更新间隔。更高的价值降低了存储事务的数量。

写的chetan.kardekar

去年发表在:2022年10月26日

结构化的流媒体应用程序运行时使用云存储桶(ADLS Gen2 S3,等等)很容易造成过度交易你访问的存储桶。

没有指定一个.trigger选项在流代码中一个常见的原因是大量的存储事务。当一个.trigger没有指定选项,存储经常可以调查。发生这种情况后立即在默认情况下每个micro-batch完成。

默认行为是在官方的描述Apache文档触发火花,“如果没有显式地指定触发器设置,那么在默认情况下,查询将在micro-batch模式下,执行,尽快将生成micro-batches前面micro-batch已完成处理”。

没有这个样例代码.trigger选项定义。如果运行,它将导致过度存储事务。

% python spark.readStream.format(“δ”).load (“< delta_table_path >”) .writeStream .format .outputMode(“δ”)(“追加”).option (“checkpointLocation”、“< checkpoint_path >”) .options (* * writeConfig) .start ()


你可以减少存储事务通过设置。触发选项.writeStream。设置.trigger处理时间几秒钟防止短轮询。

指令

默认行为是检查每10毫秒更新的源代码。对于大多数用户来说,一个更长的时间间隔源更新将对性能没有明显的影响,但交易成本大大降低。

例如,我们用一个5秒的处理时间。这比10 ms慢500倍。存储调用相应减少。

设置一个5秒的处理时间需要添加.trigger (processingTime = 5秒).writeStream

例如,修改现有包括示例代码.trigger5秒的处理时间只需要添加一行。

% python spark.readStream.format(“δ”).load (“< delta_table_path >”) .writeStream .format(“δ”).trigger (processingTime = 5秒)#添加行代码定义.trigger处理时间。.outputMode .option(“追加”)(“checkpointLocation”、“< checkpoint_path >”) .options (* * writeConfig) .start ()


你应该尝试.trigger处理时间来确定一个值,为您的应用程序进行了优化。

这篇文章有用吗?