处理分区列值时使用一个SQS队列作为流源

写的亚当Pavlacka

去年发表在:2022年5月18日

问题

如果在S3中存储的数据分区,分区列值用于源目录结构中的文件夹名称。然而,如果您使用一个SQS队列作为流媒体来源,S3-SQS源不能检测到分区列值。

例如,如果您保存以下DataFrame S3 JSON格式:

% scala val df = spark.range (10) .withColumn(“日期”,当前日期())df.write.partitionBy(“日期”). json (s3a: / / bucket名/ json)

将下面的文件结构:

% scala s3a: / / bucket名/ json / _SUCCESS s3a: / / bucket名/ json /日期= 2018-10-25 / <单个json文件>

假设你有一个S3-SQS输入流创建从队列配置S3 bucket。如果你从这个S3-SQS直接加载数据输入流使用下面的代码:

% scala org.apache.spark.sql.types进口。_ val模式= StructType(列表(StructField (“id”, IntegerType,假),StructField(“日期”,DateType假)))显示(火花。readStream .format (“s3-sqs”) .option (“fileFormat”、“json”) .option (“queueUrl”、“https://sqs.us -东- 1. - amazonaws.com/826763667205/sqs队列”).option (“sqsFetchInterval”、“1 m”) .option (“ignoreFileDeletion”,真正的). schema(模式).load ())

的输出将会是:

不正确的SQS流的结果。

你可以看到没有正确填充列值的日期。

解决方案

您可以使用的组合input_file_name ()regexp_extract ()udf正确地提取日期值,就像下面的代码片段:

% scala org.apache.spark.sql.functions进口。_ val df =火花。readStream .format (“s3-sqs”) .option (“fileFormat”、“json”) .option (“queueUrl”、“https://sqs.us -东- 1. - amazonaws.com/826763667205/sqs队列”).option (“sqsFetchInterval”, fetch_interval) .option (“ignoreFileDeletion”,真正的). schema(模式).load()显示(df.withColumn(“日期”,regexp_extract (input_file_name(),“/日期= (\ \ d {4} - \ \ d {2} \ \ d {2}) / ", 1)))

现在你可以看到正确的日期列的值以下输出:

正确的SQS流的结果。


这篇文章有用吗?