从结构化流查询故障中恢复

结构化流提供容错和数据流查询的一致性;使用砖工作流,您可以很容易地配置您的结构化流查询自动重新启动失败。通过使检查点流查询,您可以重新启动后查询失败。重新启动查询继续失败的一个。

使检查点结构化流查询

砖建议你总是指定checkpointLocation选择一个云存储路径在你开始查询。例如:

streamingDataFramewriteStream格式(“铺”)选项(“路径”,“/道路/ /表”)选项(“checkpointLocation”,“/道路/ /表/ _checkpoint”)开始()

这个检查点位置保存的所有基本信息,识别一个查询。每个查询都必须有一个不同的检查点位置。多个查询不应该相同的位置。有关更多信息,请参见结构化流编程指南

请注意

checkpointLocation大多数类型的输出需要下沉,下沉,如内存,可以自动生成一个临时检查点位置当你不提供checkpointLocation。这些临时检查点位置不保证任何容错或数据一致性保证和可能无法清理干净。避免潜在的缺陷总是指定一个checkpointLocation

配置结构化流工作重新启动流查询失败

您可以创建一个砖工作你的笔记本或JAR流查询和配置:

  • 总是使用一个新的集群。

  • 总是在失败重试。

工作有紧密集成结构化流api和可以监控所有流查询活跃在跑步。这个配置确保如果任何查询的一部分失败,工作自动终止运行查询(以及其他所有)和启动一个新的运行在一个新的集群。这种消遣笔记本或JAR代码并重新启动所有的查询。这是最安全的方式返回到一个好的状态。

请注意

  • 失败的活动流查询导致活动运行失败,终止所有其他流媒体查询。

  • 您不需要使用streamingQuery.awaitTermination ()spark.streams.awaitAnyTermination ()在你的笔记本。工作时自动防止运行完成流媒体查询是活跃的。

  • 砖建议用工作代替运行%dbutils.notebook.run ()当策划结构化流笔记本。看到运行一个砖笔记本从另一个笔记本

以下是推荐工作配置的一个示例。

  • 集群:设置这个总是使用一个新的集群和使用最新的火花版本(或至少2.1版本)。查询开始引发2.1及以上的查询和火花版本升级后可恢复。

  • 通知:设置这个如果你希望电子邮件通知失败。

  • 时间表:不设置一个时间表

  • 超时:不设置一个超时。流媒体查询无限期地为一个长的时间。

  • 最大并发运行:设置为1。同时只能有一个实例,每个查询活跃。

  • 重试:设置为无限的

看到创建和运行数据砖的工作理解这些配置。

更改后的恢复以结构化查询流

有什么变化限制在流查询允许重启之间相同的检查点位置。这里有一些种类的变化,要么是不允许的,或者改变的影响并不明确。所有人:

  • 这个词允许意味着你能做指定的改变,但是其效果的语义是否明确定义取决于查询和改变。

  • 这个词不允许意味着你不应该做指定的变化重新查询与不可预测的错误可能会失败。

  • 自卫队代表一个流DataFrame /数据集生成sparkSession.readStream

类型的结构化查询流的变化

  • 数量和类型的变化(即不同的源)的输入源:这是不允许的。

  • 输入源的参数的变化:这是否允许,是否定义良好的语义变化取决于源和查询。这里有一些例子。

    • 添加、删除和修改的速度限制是允许的:

      火花readStream格式(“卡夫卡”)。选项(“订阅”,“文章”)

      火花readStream格式(“卡夫卡”)。选项(“订阅”,“文章”)。选项(“maxOffsetsPerTrigger”,…)
    • 修改订阅的文章和文件通常不允许结果是不可预测的:spark.readStream.format(“卡夫卡”).option(“订阅”,“文章”)spark.readStream.format(“卡夫卡”).option(“订阅”,“newarticle”)

  • 改变输出的类型:一些特定组合的水槽之间的变化是允许的。这需要在个案基础上进行验证。这里有一些例子。

    • 文件沉到卡夫卡水槽是被允许的。卡夫卡只会看到新的数据。

    • 卡夫卡水槽,水槽文件是不允许的。

    • 卡夫卡水槽改为foreach,反之亦然。

  • 参数的变化输出下沉:这是否允许,是否定义良好的语义变化取决于水槽和查询。这里有一些例子。

    • 更改文件的输出目录水槽是不允许的:sdf.writeStream.format(“铺”).option(“路径”,“/ somePath”)sdf.writeStream.format(“铺”).option(“路径”,“/ anotherPath”)

    • 修改输出主题是允许的:sdf.writeStream.format(“卡夫卡”).option(“主题”,“人类”)sdf.writeStream.format(“卡夫卡”).option(“主题”,“话题二”)

    • 更改用户定义的foreach水槽(即ForeachWriter代码)是允许的,但改变依赖于代码的语义。

  • 投影的变化/过滤器/类似操作:有些情况下是允许的。例如:

    • 添加/删除过滤器是允许的:sdf.selectExpr (“”)sdf.where (…) .selectExpr (“a”) .filter (…)

    • 与相同的输出模式允许变化的预测:sdf.selectExpr (“stringColumn作为json) .writeStreamsdf.select (to_json (…)。as (json)) .writeStream

    • 预测的变化与不同的输出模式是有条件地允许:sdf.selectExpr .writeStream (“a”)sdf.selectExpr .writeStream (b)是只允许如果输出水槽允许模式改变的“一个”“b”

  • 变化有状态操作:一些流媒体业务查询需要维护状态数据,以不断更新的结果。结构化流自动检查点状态数据容错存储(例如,DBFS AWS S3, Azure Blob存储)和恢复后重新启动。然而,这种假设状态数据的模式仍然在重启时相同。这意味着任何更改(添加、删除或修改模式)的有状态操作流查询之间不允许重启。这是有状态的列表操作的模式重新启动之间不应该被改变,以确保国家恢复:

    • 流媒体聚合:例如,sdf.groupBy (“a”) .agg (…)。任何变化在数量或类型的分组关键字或聚合是不允许的。

    • 流媒体重复数据删除:例如,sdf.dropDuplicates (“”)。任何变化在数量或类型的分组关键字或聚合是不允许的。

    • Stream-stream加入:例如,sdf1.join (sdf2…)(即两个输入生成sparkSession.readStream)。变化的模式或等值连接列是不允许的。加入的变化类型(外部或内部)不允许的。其他的变化加入条件是不明确的。

    • 任意的有状态操作:例如,sdf.groupByKey (…) .mapGroupsWithState (…)sdf.groupByKey (…) .flatMapGroupsWithState (…)。任何改变的模式定义的状态和超时的类型是不允许的。任何改变在用户定义的状态映照函数是允许的,但语义效应的变化取决于用户定义的逻辑。如果你真的想支持模式变化状态,那么您可以显式编码/解码复杂状态数据结构到字节使用一种编码/解码方案,支持模式迁移。举个例子,如果你保存状态Avro-encoded字节,那么你可以改变查询之间的Avro-state-schema重启恢复二进制状态。