Apache卡夫卡

Apache卡夫卡连接器为砖结构流打包运行时。您使用卡夫卡连接器连接到卡夫卡0.10 +的kafka08连接器连接到卡夫卡0.8 +(弃用)。

模式

记录的模式是:

类型

关键

二进制

价值

二进制

主题

字符串

分区

int

抵消

时间戳

timestampType

int

关键价值总是反序列化为字节数组ByteArrayDeserializer。使用DataFrame操作(铸造(“字符串”)udf)显式地对键和值进行反序列化。

快速入门

让我们先从一个规范化WordCount例子。以下笔记本演示了如何使用结构化流运行WordCount卡夫卡。

请注意

这个笔记本的例子使用0.10卡夫卡。0.8使用卡夫卡,改变格式kafka08(即,.format (“kafka08”))。

卡夫卡与结构化流WordCount笔记本

在新标签页打开笔记本

配置

comphensive列表的配置选项,请参阅火花结构化流+卡夫卡集成指南。在你开始之前,这是最常见的配置选项的子集。

请注意

结构化流仍在发展,这个列表可能不是最新的。

有多种方法的指定主题订阅。你应该只提供其中一个参数:

选项

价值

卡夫卡版本支持

描述

订阅

一个以逗号分隔的话题。

0.8,0.10

主题订阅列表。

subscribePattern

Java正则表达式字符串。

0.10

该模式用于订阅主题(s)。

分配

JSON字符串{“局部药”:[0,1],“主题”:(2、4)}

0.8,0.10

具体topicPartitions消费。

其他值得注意的配置:

选项

价值

默认值

卡夫卡版本支持

描述

kafka.bootstrap.servers

以逗号分隔的主持人:端口。

0.8,0.10

[要求]卡夫卡bootstrap.servers配置。如果你发现没有数据从卡夫卡,首先检查代理地址列表。如果代理地址列表不正确,可能没有任何错误。这是因为卡夫卡端假设经纪人最终会变得可用,在发生网络错误重试,直到永远。

failOnDataLoss

真正的

真正的

0.10

(可选的)是否失败的查询时数据丢失的可能。查询可以从卡夫卡永久无法读取数据,由于许多场景如删除话题,话题截断前处理,等等。我们试图估计保守是否数据可能丢失。有时这可能导致假警报。设置这个选项如果它不正常工作,或者你想要查询继续处理尽管数据丢失。

minPartitions

整数> = 0,0 =禁用。

0(禁用)

0.10

(可选的)最小数量的分区从卡夫卡读取。火花2.1.0-db2和上面,您可以配置火花使用任意最小的分区从卡夫卡使用读取minPartitions选择。通常火花的1:1映射卡夫卡topicPartitions从卡夫卡火花分区使用。如果你设置minPartitions选择一个值大于你的卡夫卡topicPartitions,火花将分配大型卡夫卡分区小的碎片。这个选项可以设置在高峰负荷,所以数据倾斜,当你流落后提高处理速度。时初始化卡夫卡在每个触发消费者的成本,这可能会影响性能,如果你使用SSL连接到卡夫卡。

kafka.group.id

卡夫卡消费者组ID。

没有设置

0.10

(可选的)组ID使用从卡夫卡在阅读。支持引发2.2 +。小心地使用这个。默认情况下,每个查询生成一个独特的组ID读取数据。这可以确保每个查询都有自己的消费群体,没有面临干扰其他消费者一样,因此可以阅读所有分区的订阅的主题。在某些情况下(例如,卡夫卡组的授权),您可能需要使用特定的授权组id读取数据。您可以选择设置组ID。然而,这个要特别小心,因为它可能导致不可预测的行为。

  • 并发运行查询(包括批处理和流媒体)使用相同的组ID可能会互相干扰,导致每个查询只读数据的一部分。

  • 这也可能出现在查询开始接二连三地/重新启动。减少这样的问题,设置卡夫卡消费者配置session.timeout.ms是非常小的。

startingOffsets

最早的,最新的

最新的

0.10

(可选的)查询时的起点开始,要么是“最早”,从最早的偏移量,或一个json字符串指定为每个TopicPartition的起始偏移量。在json, 2作为一个抵消最早可以用来参考,最新的1。注意:对于批处理查询,最新(隐式或通过使用1以json)是不允许的。对于流媒体查询,这只适用于当开始一个新的查询和恢复总是捡起从哪里查询。新发现的分区在查询将从最早开始。

看到结构化流卡夫卡集成指南其他可选的配置。

重要的

不应该设置以下卡夫卡参数卡夫卡0.10连接器,它将抛出一个异常:

  • group.id:设置这个参数是不允许火花版本低于2.2。

  • auto.offset.reset:相反,设置源选择startingOffsets指定从哪里开始。为了保持一致性,结构化流(而不是卡夫卡消费者)管理内部消费的补偿。这将确保你不要错过任何数据动态订阅后的新主题/分区。startingOffsets只适用于当你开始一个新的流媒体查询,,总是从一个检查点恢复好转的查询。

  • key.deserializer:钥匙总是反序列化为字节数组ByteArrayDeserializer。使用DataFrame操作来显式地对钥匙进行反序列化。

  • value.deserializer:值总是反序列化为字节数组ByteArrayDeserializer。使用DataFrame操作显式的值进行反序列化。

  • enable.auto.commit:设置这个参数是不允许的。火花跟踪卡夫卡抵消内部和不承诺任何抵消。

  • interceptor.classes:卡夫卡源总是读键和值字节数组。它使用起来不安全ConsumerInterceptor因为它可能打破查询。

生产结构流与卡夫卡的笔记本

在新标签页打开笔记本

指标

请注意

在砖运行时8.1及以上。

你可以得到平均最小值和最大值的补偿的数量背后的流媒体查询最新抵消在所有的订阅的主题avgOffsetsBehindLatest,maxOffsetsBehindLatest,minOffsetsBehindLatest指标。看到阅读指标交互

请注意

在砖运行时9.1及以上。

获得的字节总数估计查询过程没有消耗从订阅的主题通过检查的价值estimatedTotalBytesBehindLatest。这估计是基于批量加工的最后300秒。估计的时间框架是基于可以改变通过设置选项bytesEstimateWindowLength到一个不同的值。例如,将其设置为10分钟:

df=火花readStream\格式(“卡夫卡”)\选项(“bytesEstimateWindowLength”,“10 m”)/ /分钟,可以使用“600年代”600年

如果您正在运行流在一个笔记本,你可以看到这些度量标准下原始数据选项卡中流仪表板查询进展:

{“源”:({“描述”:“KafkaV2订阅(主题)”,“指标”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}

使用SSL

启用SSL连接卡夫卡,听从指示的融合性的文档通过SSL加密和身份验证。您可以提供描述的配置,前缀卡夫卡。,如选项。例如,您指定的信任存储位置属性kafka.ssl.truststore.location

我们建议您:

一旦安装和秘密存储路径,您可以执行以下操作:

df=火花readStream\格式(“卡夫卡”)\选项(“kafka.bootstrap.servers”,)\选项(“kafka.security.protocol”,“SASL_SSL”)\选项(“kafka.ssl.truststore.location”,<dbfs- - - - - -信任存储库- - - - - -位置>)\选项(“kafka.ssl.keystore.location”,<dbfs- - - - - -密钥存储库- - - - - -位置>)\选项(“kafka.ssl.keystore.password”,dbutils秘密得到(范围= <证书- - - - - -范围- - - - - -的名字>,关键= <密钥存储库- - - - - -密码- - - - - -关键- - - - - -的名字>))\选项(“kafka.ssl.truststore.password”,dbutils秘密得到(范围= <证书- - - - - -范围- - - - - -的名字>,关键= <信任存储库- - - - - -密码- - - - - -关键- - - - - -的名字>))