Apache卡夫卡
Apache卡夫卡连接器为砖结构流打包运行时。您使用卡夫卡
连接器连接到卡夫卡0.10 +的kafka08
连接器连接到卡夫卡0.8 +(弃用)。
模式
记录的模式是:
列 |
类型 |
---|---|
关键 |
二进制 |
价值 |
二进制 |
主题 |
字符串 |
分区 |
int |
抵消 |
长 |
时间戳 |
长 |
timestampType |
int |
的关键
和价值
总是反序列化为字节数组ByteArrayDeserializer
。使用DataFrame操作(铸造(“字符串”)
udf)显式地对键和值进行反序列化。
快速入门
让我们先从一个规范化WordCount例子。以下笔记本演示了如何使用结构化流运行WordCount卡夫卡。
请注意
这个笔记本的例子使用0.10卡夫卡。0.8使用卡夫卡,改变格式kafka08
(即,.format (“kafka08”)
)。
配置
comphensive列表的配置选项,请参阅火花结构化流+卡夫卡集成指南。在你开始之前,这是最常见的配置选项的子集。
请注意
结构化流仍在发展,这个列表可能不是最新的。
有多种方法的指定主题订阅。你应该只提供其中一个参数:
选项 |
价值 |
卡夫卡版本支持 |
描述 |
---|---|---|---|
订阅 |
一个以逗号分隔的话题。 |
0.8,0.10 |
主题订阅列表。 |
subscribePattern |
Java正则表达式字符串。 |
0.10 |
该模式用于订阅主题(s)。 |
分配 |
JSON字符串 |
0.8,0.10 |
具体topicPartitions消费。 |
其他值得注意的配置:
选项 |
价值 |
默认值 |
卡夫卡版本支持 |
描述 |
---|---|---|---|---|
kafka.bootstrap.servers |
以逗号分隔的主持人:端口。 |
空 |
0.8,0.10 |
[要求]卡夫卡 |
failOnDataLoss |
|
|
0.10 |
(可选的)是否失败的查询时数据丢失的可能。查询可以从卡夫卡永久无法读取数据,由于许多场景如删除话题,话题截断前处理,等等。我们试图估计保守是否数据可能丢失。有时这可能导致假警报。设置这个选项 |
minPartitions |
整数> = 0,0 =禁用。 |
0(禁用) |
0.10 |
(可选的)最小数量的分区从卡夫卡读取。火花2.1.0-db2和上面,您可以配置火花使用任意最小的分区从卡夫卡使用读取 |
kafka.group.id |
卡夫卡消费者组ID。 |
没有设置 |
0.10 |
(可选的)组ID使用从卡夫卡在阅读。支持引发2.2 +。小心地使用这个。默认情况下,每个查询生成一个独特的组ID读取数据。这可以确保每个查询都有自己的消费群体,没有面临干扰其他消费者一样,因此可以阅读所有分区的订阅的主题。在某些情况下(例如,卡夫卡组的授权),您可能需要使用特定的授权组id读取数据。您可以选择设置组ID。然而,这个要特别小心,因为它可能导致不可预测的行为。
|
startingOffsets |
最早的,最新的 |
最新的 |
0.10 |
(可选的)查询时的起点开始,要么是“最早”,从最早的偏移量,或一个json字符串指定为每个TopicPartition的起始偏移量。在json, 2作为一个抵消最早可以用来参考,最新的1。注意:对于批处理查询,最新(隐式或通过使用1以json)是不允许的。对于流媒体查询,这只适用于当开始一个新的查询和恢复总是捡起从哪里查询。新发现的分区在查询将从最早开始。 |
看到结构化流卡夫卡集成指南其他可选的配置。
重要的
你不应该设置以下卡夫卡参数卡夫卡0.10连接器,它将抛出一个异常:
group.id
:设置这个参数是不允许火花版本低于2.2。auto.offset.reset
:相反,设置源选择startingOffsets
指定从哪里开始。为了保持一致性,结构化流(而不是卡夫卡消费者)管理内部消费的补偿。这将确保你不要错过任何数据动态订阅后的新主题/分区。startingOffsets
只适用于当你开始一个新的流媒体查询,,总是从一个检查点恢复好转的查询。key.deserializer
:钥匙总是反序列化为字节数组ByteArrayDeserializer
。使用DataFrame操作来显式地对钥匙进行反序列化。value.deserializer
:值总是反序列化为字节数组ByteArrayDeserializer
。使用DataFrame操作显式的值进行反序列化。enable.auto.commit
:设置这个参数是不允许的。火花跟踪卡夫卡抵消内部和不承诺任何抵消。interceptor.classes
:卡夫卡源总是读键和值字节数组。它使用起来不安全ConsumerInterceptor
因为它可能打破查询。
指标
请注意
在砖运行时8.1及以上。
你可以得到平均最小值和最大值的补偿的数量背后的流媒体查询最新抵消在所有的订阅的主题avgOffsetsBehindLatest
,maxOffsetsBehindLatest
,minOffsetsBehindLatest
指标。看到阅读指标交互。
请注意
在砖运行时9.1及以上。
获得的字节总数估计查询过程没有消耗从订阅的主题通过检查的价值estimatedTotalBytesBehindLatest
。这估计是基于批量加工的最后300秒。估计的时间框架是基于可以改变通过设置选项bytesEstimateWindowLength
到一个不同的值。例如,将其设置为10分钟:
df=火花。readStream\。格式(“卡夫卡”)\。选项(“bytesEstimateWindowLength”,“10 m”)/ /米为分钟,你可以也使用“600年代”为600年秒
如果您正在运行流在一个笔记本,你可以看到这些度量标准下原始数据选项卡中流仪表板查询进展:
{“源”:({“描述”:“KafkaV2订阅(主题)”,“指标”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}
使用SSL
启用SSL连接卡夫卡,听从指示的融合性的文档通过SSL加密和身份验证。您可以提供描述的配置,前缀卡夫卡。
,如选项。例如,您指定的信任存储位置属性kafka.ssl.truststore.location
。
我们建议您:
一旦安装和秘密存储路径,您可以执行以下操作:
df=火花。readStream\。格式(“卡夫卡”)\。选项(“kafka.bootstrap.servers”,…)\。选项(“kafka.security.protocol”,“SASL_SSL”)\。选项(“kafka.ssl.truststore.location”,<dbfs- - - - - -信任存储库- - - - - -位置>)\。选项(“kafka.ssl.keystore.location”,<dbfs- - - - - -密钥存储库- - - - - -位置>)\。选项(“kafka.ssl.keystore.password”,dbutils。秘密。得到(范围= <证书- - - - - -范围- - - - - -的名字>,关键= <密钥存储库- - - - - -密码- - - - - -关键- - - - - -的名字>))\。选项(“kafka.ssl.truststore.password”,dbutils。秘密。得到(范围= <证书- - - - - -范围- - - - - -的名字>,关键= <信任存储库- - - - - -密码- - - - - -关键- - - - - -的名字>))