问题
你有一个流的工作使用foreachBatch ()处理DataFrames。
% scala streamingDF.writeStream.outputMode(“追加”)。foreachBatch {(batchDF: DataFrame batchId:长)= > batchDF.write.format .mode(“铺”)(“覆盖”).save (output_directory)} .start ()
检查点文件被创建,但并没有被删除。
您可以验证问题,方法是导航到根目录并查看/ local_disk0 / tmp /文件夹中。检查点文件保留在文件夹中。
导致
命令foreachBatch ()用于支持DataFrame操作通常不支持流媒体DataFrames。通过使用foreachBatch ()可以将这些操作应用到每个micro-batch。这需要一个检查站目录跟踪流的更新。
如果你没有指定一个自定义的检查点位置,默认的检查点在创建目录/ local_disk0 / tmp /。
砖使用检查点目录来确保正确的和一致的进展信息。关闭流时,故意或偶然,检查点目录允许砖重启和接究竟在什么地方。
如果流被取消关闭流从笔记本,砖的工作试图清理力所能及的检查点目录。如果流以其他方式终止,或者终止工作,检查点目录没有清理干净。
这是设计。
解决方案
你应该手动指定检查点的目录checkpointLocation选择。
% scala streamingDF.writeStream.option (“checkpointLocation”、“< checkpoint-path >”) .outputMode(“追加”)。foreachBatch {(batchDF: DataFrame batchId:长)= > batchDF.write.format .mode(“铺”)(“覆盖”).save (output_directory)} .start ()