利用火花结构化流扩展分析
2022年7月14日 在客户
这篇文章的米科学数据科学与工程团队。
现代数据不会停止生长
“工程师教的生活经验,做一些快速和做正确的事是互斥的!与结构化流从砖,M科学得到速度和准确度分析平台,而不需要每次都从头开始重建我们的基础设施。”bob体育客户端下载- Ben Tallman, CTO
假设你,大数据时代的“卑微的数据管道工”和创建一个任务是分析解决方案在线零售数据集:
发票 没有 |
股票 代码 |
描述 | 数量 | 发票 日期 |
单位 价格 |
客户 ID |
国家 |
---|---|---|---|---|---|---|---|
536365年 | 85123一个 | 他叫白挂 | 6 | 2012-01-10 | 2.55 | 17850年 | 联合王国 |
536365年 | 71053年 | 白色金属灯 | 6 | 2012-01-10 | 3.39 | 17850年 | 联合王国 |
536365年 | 84406 b | 奶油丘比特的心 | 8 | 2012-01-10 | 2.75 | 17850年 | 联合王国 |
… | … | … | … | … | … | … | … |
分析你一直要求很简单——一个聚合数量的美元,单位出售,和独特的用户每天,在每只股票代码。只有几行PySpark,我们可以将我们的原始数据转换成有用的聚合:
进口pyspark.sql.functions作为Fdf=spark.table (“default.online_retail_data”)agg_df=(df#集团数据通过月、项目代码和国家.groupBy (“InvoiceDate”,“StockCode”,)#返回汇总的美元,销量,和独特的用户.agg (F。总和(“UnitPrice”).alias(“美元”),F。总和(“数量”).alias(“单位”),F.countDistinct (“CustomerID”).alias(“用户”),))
(agg_df.write.format (“δ”).mode (“覆盖”).saveAsTable (“analytics.online_retail_aggregations”))
一起和你的新聚合数据,你可以把一个很好的可视化做……业务的事情。
这工作,对吧?
ETL过程的一个静态分析,你不期望的数据被更新,假设您已经将是唯一的数据数据你曾经拥有的。一个静态分析的问题吗?
你打算做什么,当你得到更多的数据?
天真的回答是每天只运行相同的代码,但是你处理文档的所有数据每次运行代码,和每一个新的更新意味着你已经处理加工数据。当你的数据变得足够大时,你会加倍的在时间和计算成本。
使用静态分析,你把钱花在你已经处理加工数据。
很少有现代数据源,不会被更新。如果你想保持你的分析与数据来源和计算成本节省一大笔钱,你需要一个更好的解决方案。
我们做什么当我们的数据增长?
在过去的几年里,“大数据”已经成为…缺乏。大量的数据增长和更多的生活已经在线,大数据的时代已经成为时代的“帮助我们,它只是不会停止越来越大的数据。”A good data source doesn't stop growing while you work; this growth can make keeping data products up-to-date a monumental task.
在米科学,我们的使命是用典型的季度报告以外的替代数据-数据或股票趋势数据来源,分析、提炼,并预测市场的变化和经济。
每一天,我们的分析师和工程师面临一个挑战:替代数据增长非常快。我甚至说,如果我们的数据停止增长,一些经济已经非常非常错误的。
随着数据的增长,我们的分析解决方案需要处理这一增长。我们不仅需要考虑经济增长,但我们也需要考虑可能会在后期或无序的数据。这是我们的使命的一个重要部分——每一批新的数据可以批处理经济信号一个戏剧性的变化。
分析产品,使可伸缩解决方案米科学分析师和客户依赖每一天,我们使用砖结构的流,一个Apache火花™API具有可扩展性和容错性流处理建立在火花的SQL引擎与砖Lakehouse的平台。bob体育客户端下载结构化流媒体向我们保证,随着数据的增长,我们的解决方案也将规模。
使用火花结构化流
结构化流进场当新批次的数据被引入您的数据源。结构化流利用三角洲湖跟踪数据变化的能力确定哪些数据是一个更新的一部分,重新计算的部分分析影响的新数据。
重新考虑你如何思考是很重要的流数据。对许多人来说,“流”是指实时数据流电影,查看Twitter,查看天气,等等。如果你分析师、工程师或科学家,任何数据,更新是一个流。更新的频率并不重要。秒,几个小时,几天,甚至几个月——如果数据更新,数据流。如果数据流,那么结构化流可以节省很多的麻烦。
使用结构化的流,您可以避免的成本加工之前的数据
让我们回到我们的假设——你有一个聚合分析,今天你需要提供和保持更新新的数据卷。这一次,我们有DeliveryDate
列提醒我们我们之前的单发徒劳的分析:
发票 没有 |
股票 代码 |
描述 | 数量 | 发票 日期 |
交付 日期 |
单位 价格 |
客户 ID |
国家 |
---|---|---|---|---|---|---|---|---|
536365年 | 85123一个 | 他叫白挂 | 6 | 2012-01-10 | 2012-01-17 | 2.55 | 17850年 | 联合王国 |
536365年 | 71053年 | 白色金属灯 | 6 | 2012-01-10 | 2012-01-15 | 3.39 | 17850年 | 联合王国 |
536365年 | 84406 b | 奶油丘比特的心 | 8 | 2012-01-10 | 2012-01-16 | 2.75 | 17850年 | 联合王国 |
… | … | … | … | … | … | … | … | … |
值得庆幸的是,结构化的界面流非常类似于您的原始PySpark片段。这是你的原始静态批量分析代码:
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =# = = = = =老静态批代码= = = = =# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =进口pyspark.sql.functions作为Fdf = spark.table (“default.online_retail_data”)
agg_df = (df#组数据按日期和项目代码.groupBy (“InvoiceDate”,“StockCode”,)#返回汇总美元,销量,和独特的用户.agg (F。总和(“UnitPrice”).alias (“美元”),F。总和(“数量”).alias (“单位”),F.countDistinct (“CustomerID”).alias (“用户”),))
(agg_df.write。格式(“δ”).mode (“覆盖”).saveAsTable (“analytics.online_retail_aggregations”))
只有一些调整,我们可以调整这个利用结构化流。把你以前的代码,你会:
- 阅读我们的输入表流而不是一个静态批数据
- 做一个目录在您的文件系统检查点将存储
- 设置一个水印建立边界多晚数据可以被忽略在分析前到达
- 修改你的一些转换防止保存的检查点状态太大
- 写下你的最终分析表作为流增量处理输入数据
我们应用这些调整,贯穿每一个变化,给你几个选择如何配置你流的行为。
这是‚‚“stream-ified”
旧版本的代码:
#=========================================#=====新结构化流代码=====#=========================================+CHECKPOINT_DIRECTORY=“/δ/检查点/ online_retail_analysis”+dbutils.fs.mkdirs (CHECKPOINT_DIRECTORY)+df=spark.readStream.table (“default.online_retail_data”)agg_df=(df+#水印数据与一个InvoiceDate的7天+.withWatermark (“InvoiceDate”, f“7天”)#集团数据通过日期&项目代码.groupBy (“InvoiceDate”,“StockCode”,)#返回汇总的美元,销量,和独特的用户.agg (F。总和(“UnitPrice”).alias(“美元”),F。总和(“数量”).alias(“单位”),+F.approx_count_distinct (“CustomerID”,0.05).alias(“用户”),))
(+agg_df.writeStream.format(“δ”)+.outputMode(“更新”)+。触发(一次=真正的)+CHECKPOINT_DIR .option (“checkpointLocation”)+.toTable (“analytics.online_retail_aggregations”))
让我们重复每个调整我们的结构化流工作:
1。流从三角洲表
+ df = spark.readStream.table (“default.online_retail_data”)
δ所有表的漂亮的特性,这可能成为:你可以把它当做一个流。因为增量跟踪更新,你可以使用.readStream.table ()
流新更新每次运行流程。
重要的是要注意,你的输入表必须是一个三角洲表工作。有可能流其他数据格式有不同的方法,但是.readStream.table ()
需要三角洲的表
+#创建检查点目录+ CHECKPOINT_DIRECTORY =“/δ/检查点/ online_retail_analysis”+ dbutils.fs.mkdirs (CHECKPOINT_DIRECTORY)
在结构化Streaming-jargon,在这个分析是一个聚合有状态的转换。不太远的杂草、结构化流节省聚合的状态作为一个检查点每次更新分析。
这就是救了你在计算成本:而不是加工所有更新的数据每次都从头开始,简单的收拾,最后更新。
3所示。定义一个水印
+#水印数据的InvoiceDate 7天+ .withWatermark (“InvoiceDate”f“7天”)
当你得到新的数据,有一个好的机会,你可以接收数据无序。水印你的数据允许您定义一个截止追溯到总量如何被更新。从某种意义上说,它会创建一个“活”和“定居”数据之间的界限。
产品说明:假设这数据包含的7月数据。我们设置水印到7天。这意味着骨料从7日到1日还“活”。新的更新可以改变聚合物从1日到7日,但任何新的数据,落后超过7天不被包括在更新- - - - - -聚合前1日“定居”,和更新的时期将被忽略。
新数据以外的水印不纳入分析。
重要的是要注意,列使用水印必须是一个时间戳或窗口。
4所示。使用结构化Streaming-compatible转换
+ F.approx_count_distinct (“CustomerID”,0.05)
为了保持你的检查站从膨胀状态,你可能需要更换你的一些转换更多storage-efficient替代品。列,可能包含很多独特的个人价值,approx_count_distinct
函数将得到你的结果在一个定义的相对标准偏差。
5。创建输出流
+ agg_df.writeStream。格式(“δ”)+ .outputMode (“更新”)= + .trigger(一次真正的)+ .option (“checkpointLocation”CHECKPOINT_DIR)+ .toTable (“analytics.online_retail_aggregations”)
最后一步是输出分析三角洲表。这几个选项,确定你流的行为:
.outputMode(“更新”)
配置流,聚合将上次每次运行的代码,而不是从头开始运行。从头开始重新聚合,可以使用“完整的”
——实际上,做一批传统聚合,同时仍然保留的聚合状态的未来“更新”
运行。触发(一旦= True)
将触发查询一次,当行输出代码开始,然后停止查询,一旦所有的新数据被处理。“checkpointLocation”
让程序知道检查点应该存储。
这些配置选项使流最接近像原来的一次性的解决方案。
这一起来创建一个可伸缩的解决日益增长的数据。如果新数据添加到你的来源,你会考虑新的数据分析没有成本一只手臂和一条腿。
你会很难找到任何上下文数据不会被更新。软协议,数据分析师,工程师和科学家当我们工作与现代数据——它会增长,我们必须找到方法来处理这种增长。
火花结构化流,我们可以使用最新和最优数据提供最好的产品,没有规模的头痛。