用户定义的聚合函数- Scala

本文包含一个UDAF以及如何注册它使用Apache火花SQL。看到用户定义的聚合函数(UDAFs)为更多的细节。

实现一个UserDefinedAggregateFunction

进口orgapache火花sql表达式MutableAggregationBuffer进口orgapache火花sql表达式UserDefinedAggregateFunction进口orgapache火花sql进口orgapache火花sql类型_GeometricMean扩展UserDefinedAggregateFunction{/ /这是聚合函数的输入字段。覆盖definputSchema:orgapache火花sql类型StructType=StructType(StructField(“价值”,倍增式)::)/ /这是你保持的内部字段计算你的总。覆盖defbufferSchema:StructType=StructType(StructField(“数”,LongType)::StructField(“产品”,倍增式)::)/ /这是你aggregatation函数的输出类型。覆盖def数据类型:数据类型=倍增式覆盖def确定的:布尔=真正的/ /这是缓冲模式的初始值。覆盖def初始化(缓冲:MutableAggregationBuffer):单位={缓冲(0)=0 l缓冲(1)=1.0}/ /这是如何更新给定一个输入缓冲模式。覆盖def更新(缓冲:MutableAggregationBuffer,输入:):单位={缓冲(0)=缓冲木屐()(0)+1缓冲(1)=缓冲木屐()(1)*输入木屐()(0)}/ /这是如何与bufferSchema合并两个对象类型。覆盖def合并(buffer1:MutableAggregationBuffer,buffer2:):单位={buffer1(0)=buffer1木屐()(0)+buffer2木屐()(0)buffer1(1)=buffer1木屐()(1)*buffer2木屐()(1)}/ /这是你输出最终的价值,给你bufferSchema的最终值。覆盖def评估(缓冲:):任何={数学战俘(缓冲(1),1toDouble/缓冲getLong(0))}}

注册与火花UDAF SQL

火花udf注册(“通用汽车”,GeometricMean)

用你UDAF

/ /创建一个DataFrame并引发SQL表进口orgapache火花sql功能_瓦尔id=火花范围(1,20.)idcreateOrReplaceTempView(“id”)瓦尔df=火花sql(“选择id, id % 3从ids group_id”)dfcreateOrReplaceTempView(“简单”)
——使用UDAF group_by声明和调用。选择group_id,通用汽车(id)简单的集团通过group_id
/ /或者使用DataFrame语法调用聚合函数。/ /创建一个实例的UDAF GeometricMean。瓦尔通用汽车=GeometricMean/ /显示的列值的几何平均数“id”。dfgroupBy(“group_id”)。gg(通用汽车(上校(“id”))。作为(“GeometricMean”))。显示()/ /调用UDAF由其指定的名字。dfgroupBy(“group_id”)。gg(expr(“通用汽车(id) GeometricMean”))。显示()