Apache卡夫卡

用于结构化流的Apache Kafka连接器打包在Databricks Runtime中。你可以使用卡夫卡连接器连接到Kafka 0.10+和kafka08连接器连接到Kafka 0.8+(已弃用)。

模式

记录的模式是:

类型

关键

二进制

价值

二进制

主题

字符串

分区

int

抵消

时间戳

timestampType

int

关键价值总是反序列化为字节数组与ByteArrayDeserializer.使用数据帧操作(铸造(“字符串”), udfs)来显式反序列化键和值。

快速入门

让我们从一个规范的WordCount示例开始。下面的笔记本演示了如何使用Kafka的结构化流运行WordCount。

请注意

本笔记本示例使用Kafka 0.10。要使用Kafka 0.8,将格式更改为kafka08(即,.format(“kafka08”)).

Kafka WordCount与结构化流笔记本

在新标签页打开笔记本

配置

有关配置选项的完整列表,请参见Spark结构化流+ Kafka集成指南.首先,这里是最常用配置选项的一个子集。

请注意

由于结构化流仍在开发中,此列表可能不是最新的。

有多种方法可以指定要订阅的主题。您应该只提供以下参数中的一个:

选项

价值

支持的Kafka版本

描述

订阅

以逗号分隔的主题列表。

0.8, 0.10

要订阅的主题列表。

subscribePattern

Java正则表达式字符串。

0.10

用于订阅主题的模式。

分配

JSON字符串{“局部药”:[0,1],“主题”:(2、4)}

0.8, 0.10

要使用的特定topicPartitions。

其他值得注意的配置:

选项

价值

默认值

支持的Kafka版本

描述

kafka.bootstrap.servers

主机:端口列表,以逗号分隔。

0.8, 0.10

[必选]卡夫卡bootstrap.servers配置。如果你发现没有来自Kafka的数据,首先检查代理地址列表。如果代理地址列表不正确,则可能没有任何错误。这是因为Kafka客户端假设代理最终会变得可用,并且在网络错误的情况下永远重试。

failOnDataLoss

真正的

真正的

0.10

[可选]当数据可能丢失时,查询是否失败。由于许多情况,比如删除主题、处理前截断主题等,查询可能永远无法从Kafka读取数据。我们试图保守地估计数据是否可能丢失。有时这会引起错误的警报。将此选项设置为如果它没有像预期的那样工作,或者您希望在数据丢失的情况下继续处理查询。

minPartitions

整数>= 0,0 = disabled。

0(禁用)

0.10

[可选]Kafka最小分区数在Spark 2.1.0-db2及以上版本中,您可以配置Spark使用任意最小的分区从Kafka读取minPartitions选择。通常Spark有Kafka topicPartitions到Kafka使用的Spark分区的1-1映射。如果你设置minPartitions选项的值大于你的Kafka topicPartitions, Spark会把大的Kafka分区划分成小的分区。可以在峰值负载、数据倾斜和流落后时设置此选项,以提高处理速度。这是以在每个触发器上初始化Kafka消费者为代价的,如果在连接到Kafka时使用SSL,这可能会影响性能。

kafka.group.id

Kafka消费组ID。

没有设置

0.10

[可选]从Kafka读取时使用的组ID。Spark 2.2+支持。请谨慎使用。默认情况下,每个查询生成一个唯一的组ID用于读取数据。这确保每个查询都有自己的消费者组,不受任何其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些情况下(例如,Kafka基于组的授权),您可能希望使用特定的授权组id来读取数据。您可以选择设置组ID。但是,这样做要非常谨慎,因为它可能会导致意想不到的行为。

  • 同时运行具有相同组ID的查询(批处理和流处理)可能会相互干扰,导致每个查询只能读取部分数据。

  • 在快速连续地启动/重新启动查询时也可能发生这种情况。为了减少这样的问题,设置Kafka消费者配置session.timeout.ms非常小。

startingOffsets

最早,最晚

最新的

0.10

[可选]查询开始时的起始点,可以是"最早的偏移量",也可以是指定每个TopicPartition起始偏移量的json字符串。在json中,-2作为偏移量可以用来表示最早,-1表示最晚。注意:对于批量查询,不允许使用latest(隐式或在json中使用-1)。对于流查询,这只适用于新查询开始时,并且恢复总是从查询停止的地方开始。查询期间新发现的分区最早开始。

看到结构化流式Kafka集成指南其他可选配置。

重要的

不应该为Kafka 0.10连接器设置以下Kafka参数,因为它会抛出异常:

  • group.id: 2.2以下版本不支持设置。

  • auto.offset.reset:请设置source选项startingOffsets指定从哪里开始。为了保持一致性,结构化流(相对于Kafka消费者)在内部管理偏移量的消耗。这确保在动态订阅新主题/分区后不会遗漏任何数据。startingOffsets仅当您开始一个新的流查询时才适用,并且从检查点恢复总是从查询停止的地方开始。

  • key.deserializer:键总是被反序列化为字节数组ByteArrayDeserializer.使用DataFrame操作显式地反序列化键。

  • value.deserializer:值总是被反序列化为字节数组ByteArrayDeserializer.使用DataFrame操作显式地反序列化值。

  • enable.auto.commit:不允许设置该参数。Spark会在内部跟踪Kafka的偏移量,不会提交任何偏移量。

  • interceptor.classes: Kafka源总是以字节数组的形式读取键和值。用起来不安全ConsumerInterceptor因为它可能会中断查询。

生产结构化流与卡夫卡笔记本

在新标签页打开笔记本

指标

请注意

在Databricks Runtime 8.1及以上版本中可用。

方法可以获得流查询在所有订阅主题中落后于最新可用偏移量的偏移量的平均值、最小值和最大值avgOffsetsBehindLatestmaxOffsetsBehindLatest,minOffsetsBehindLatest指标。看到交互式阅读指标

请注意

在Databricks Runtime 9.1及以上版本中可用。

的值,获得查询进程未从订阅的主题消耗的估计总字节数estimatedTotalBytesBehindLatest.这个估计是基于最近300秒内处理的批次。估算所基于的时间范围可以通过设置该选项来更改bytesEstimateWindowLength换一个不同的值。例如,设置为10分钟:

df火花readStream格式“卡夫卡”选项“bytesEstimateWindowLength”“10 m”//分钟可以使用“600年代”600

如果在笔记本中运行流,则可以在原始数据页中的流查询进度仪表板

“源”“描述”“KafkaV2订阅(主题)”“指标”“avgOffsetsBehindLatest”“4.0”“maxOffsetsBehindLatest”“4”“minOffsetsBehindLatest”“4”“estimatedTotalBytesBehindLatest”“80.0”},

使用SSL

要启用Kafka的SSL连接,请遵循Confluent文档中的说明使用SSL进行加密和身份验证.您可以提供此处描述的配置,并以卡夫卡。,作为选项。例如,在属性中指定信任存储位置kafka.ssl.truststore.location

我们建议您:

一旦路径挂载并存储秘密,您可以执行以下操作:

df火花readStream格式“卡夫卡”选项“kafka.bootstrap.servers”...选项“kafka.security.protocol”“SASL_SSL”选项“kafka.ssl.truststore.location”<dbfs-信任存储库-位置>选项“kafka.ssl.keystore.location”<dbfs-密钥存储库-位置>选项“kafka.ssl.keystore.password”dbutils秘密得到范围= <证书-范围-的名字>关键= <密钥存储库-密码-关键-的名字>))选项“kafka.ssl.truststore.password”dbutils秘密得到范围= <证书-范围-的名字>关键= <信任存储库-密码-关键-的名字>))