检查点文件不删除当使用foreachBatch ()

学习如何防止foreachBatch()从使用大量的存储检查点文件。

写的亚当Pavlacka

去年发表在:2022年5月19日

问题

你有一个流的工作使用foreachBatch ()处理DataFrames。

% scala streamingDF.writeStream.outputMode(“追加”)。foreachBatch {(batchDF: DataFrame batchId:长)= > batchDF.write.format .mode(“铺”)(“覆盖”).save (output_directory)} .start ()

检查点文件被创建,但并没有被删除。

您可以验证问题,方法是导航到根目录并查看/ local_disk0 / tmp /文件夹中。检查点文件保留在文件夹中。

导致

命令foreachBatch ()用于支持DataFrame操作通常不支持流媒体DataFrames。通过使用foreachBatch ()可以将这些操作应用到每个micro-batch。这需要一个检查站目录跟踪流的更新。

如果你没有指定一个自定义的检查点位置,默认的检查点在创建目录/ local_disk0 / tmp /

砖使用检查点目录来确保正确的和一致的进展信息。关闭流时,故意或偶然,检查点目录允许砖重启和接究竟在什么地方。

如果流被取消关闭流从笔记本,砖的工作试图清理力所能及的检查点目录。如果流以其他方式终止,或者终止工作,检查点目录没有清理干净。

这是设计。

解决方案

你应该手动指定检查点的目录checkpointLocation选择。

% scala streamingDF.writeStream.option (“checkpointLocation”、“< checkpoint-path >”) .outputMode(“追加”)。foreachBatch {(batchDF: DataFrame batchId:长)= > batchDF.write.format .mode(“铺”)(“覆盖”).save (output_directory)} .start ()


这篇文章有用吗?