得到所有文件最后修改时间在自动加载程序和批处理作业

定义一个UDF列出所有文件路径和为每一个返回的最后修改时间。

写的DD沙玛

去年发表在:2022年12月1日

您正在运行一个流的工作自动加载器(AWS|Azure|GCP),想要每个文件的最后修改时间存储账户。

指令

得到文件的路径被自动加载程序文章描述了如何得到所有文件的文件名和路径被自动加载程序。在本文中,我们建立在这一基础上,并使用示例代码演示如何使用一个定制的UDF,然后提取文件的最后修改时间。

  1. 首先定义您的进口和变量。您需要定义< storage-base-path >,以及< input-dir >,< output-dir >您正在使用。
    进口org.apache.hadoop.conf。配置进口org.apache.hadoop.fs。{FileStatus、文件系统、路径}org.apache.spark.sql.functions进口。{input_file_name坳,udf, from_unixtime} org.apache.spark.sql.types进口。_ val basePath = " < storage-base-path > " val inputLocation = basePath + " < input-dir > " val outputLocation = basePath +“< output-dir >”
  2. 对于本例,我们需要生成示例数据并将其存储在DataFrame。在实际用例中,您将读取数据存储桶。
    进口org.apache.spark.sql.types。_ val sampleData = Seq(行(1,“詹姆斯”,“M”, 1000年)、行(1,“迈克尔”,20岁的“F”, 2000年)、行(2,“罗伯特”,30岁的“M”, 3000年)、行(2“玛丽亚”40岁的“F”, 4000年)、行(3,“珍”,50岁的“M”, 5000)) val sampleSchema = StructType(数组(StructField (“id”, IntegerType,真的),StructField(“名字”,StringType,真),StructField(“时代”,IntegerType,真的),StructField(“性别”,StringType,真的),StructField(“工资”,IntegerType,真)))val df = spark.createDataFrame (sc.parallelize (sampleData) sampleSchema) df.coalesce (1) .write.format(“铺”)。partitionBy (“id”,“年龄”).mode(“追加”).save (inputLocation);spark.read.format(“铺”).load (inputLocation) .count ();
  3. 创建一个定制的UDF列出所有文件的存储路径和返回每个文件的最后修改时间。
    val getModificationTimeUDF = udf((路径:字符串)= > {val finalPath =新路径(路径)val fs = finalPath.getFileSystem(参看)如果(fs.exists (finalPath)) {fs。.head listStatus(新路径(路径))。其他getModificationTime}{1 / /或其他价值基于业务决定}})
  4. 应用UDF的批处理作业。UDF返回每个文件的最后修改时间在UNIX格式。将其转换成一个人类可读的格式除以1000,然后丢的时间戳
    val df = spark.read.format(“铺”).load (inputLocation) .withColumn (“filePath input_file_name ()) .withColumn (“fileModificationTime getModificationTimeUDF (col (“filePath”))) .withColumn (“fileModificationTimestamp from_unixtime(美元/ 1000“fileModificationTime”,“yyyy-MM-dd HH: mm: ss”) .cast (TimestampType)。as(“时间戳”)).drop (“fileModificationTime”)显示(df)
  5. 应用UDF汽车加载程序流的工作。
    val自卫队= spark.readStream.format (“cloudFiles”) . schema (sampleSchema) .option (“cloudFiles。格式”、“铺”).option (“cloudFiles。includeExistingFiles”、“真实”).option (“cloudFiles。connectionString”, connectionString) .option (“cloudFiles。resourceGroup”, resourceGroup) .option (“cloudFiles。subscriptionId”, subscriptionId) .option (“cloudFiles。tenantId”, tenantId) .option (“cloudFiles。clientId”, clientId) .option (“cloudFiles。clientSecret”, clientSecret) .option (“cloudFiles。useNotifications”、“真实”).load (inputLocation) .withColumn (“filePath input_file_name ()) .withColumn (“fileModificationTime getModificationTimeUDF (col (“filePath”))) .withColumn (“fileModificationTimestamp from_unixtime (“fileModificationTime”/ 1000美元,yyyy-MM-dd HH: mm: ss) .cast (TimestampType)。as(“时间戳”)).drop (“fileModificationTime”)显示(sdf)


回顾一下,input_file_name ()是用来读取文件绝对路径,包括文件名。然后,我们创建一个定制的UDF列出所有文件的存储路径。你可以得到文件的最后修改时间从每个文件但它是列在UNIX时间格式。UNIX时间格式转换成可读的格式UNIX时间除以1000,将它转换为一个时间戳。




这篇文章有用吗?