当你处理流媒体文件自动加载器(AWS|Azure|GCP),事件记录基于底层存储中创建的文件。
本文向您展示如何添加每个文件名的文件路径的新列DataFrame输出。
一个用例是审计。当文件被吸收到分区的文件夹结构通常是有用的元数据,如时间戳,可以从审计的路径。
例如,假设一个文件的路径和文件名2020/2021-01-01 / file1_T191634.csv。
从这条路可以应用定制udf和使用正则表达式来提取细节(2021-01-01)日期和时间戳(T191634)。
下面的示例代码使用input_file_name ()得到每一行的路径和文件名,写一个新列命名filePath。
% scala val df = spark.readStream.format (“cloudFiles”) . schema .option (“cloudFiles(模式)。格式”、“csv”) .option (“cloudFiles.region”、“ap-south-1”) .load .withColumn(“路径”)(“filePath input_file_name ())