结构化的流媒体应用程序运行时使用云存储桶(ADLS Gen2 S3,等等)很容易造成过度交易你访问的存储桶。
没有指定一个.trigger选项在流代码中一个常见的原因是大量的存储事务。当一个.trigger没有指定选项,存储经常可以调查。发生这种情况后立即在默认情况下每个micro-batch完成。
默认行为是在官方的描述Apache文档触发火花,“如果没有显式地指定触发器设置,那么在默认情况下,查询将在micro-batch模式下,执行,尽快将生成micro-batches前面micro-batch已完成处理”。
没有这个样例代码.trigger选项定义。如果运行,它将导致过度存储事务。
% python spark.readStream.format(“δ”).load (“< delta_table_path >”) .writeStream .format .outputMode(“δ”)(“追加”).option (“checkpointLocation”、“< checkpoint_path >”) .options (* * writeConfig) .start ()
你可以减少存储事务通过设置。触发选项.writeStream。设置.trigger处理时间几秒钟防止短轮询。
指令
默认行为是检查每10毫秒更新的源代码。对于大多数用户来说,一个更长的时间间隔源更新将对性能没有明显的影响,但交易成本大大降低。
例如,我们用一个5秒的处理时间。这比10 ms慢500倍。存储调用相应减少。
设置一个5秒的处理时间需要添加.trigger (processingTime = 5秒)到.writeStream。
例如,修改现有包括示例代码.trigger5秒的处理时间只需要添加一行。
% python spark.readStream.format(“δ”).load (“< delta_table_path >”) .writeStream .format(“δ”).trigger (processingTime = 5秒)#添加行代码定义.trigger处理时间。.outputMode .option(“追加”)(“checkpointLocation”、“< checkpoint_path >”) .options (* * writeConfig) .start ()
你应该尝试.trigger处理时间来确定一个值,为您的应用程序进行了优化。