跳转到主要内容
公司博客上”>
             <noscript>
              <img data-gatsby-image-ssr=

如何提升建立疾病预防控制中心和多路复用数据管道砖三角洲住表吗

分享这篇文章

这个博客已经联合,由Ruchira和Joydeep隆起,我们想感谢他们的贡献和思想领导采用砖Lakehouse平台。bob体育客户端下载


隆起主要现在购买支付后的解决方案,使人们从生活中得到更多,一个深思熟虑的一次购买。隆起的灵活的支付选项给购物者一个简单的,非意外现在购买,现在住,随着时间的推移和支付。

提升的解决方案集成到购买超过200商业合作伙伴,最高水平的安全、隐私和数据管理。bob体育外网下载这将确保客户享受无摩擦的在网上购物,呼叫中心和现场经验。这个巨大的合作伙伴生态系统创造挑战他们的工程团队在工程和分析数据。随着公司规模成倍增长数据是其主要价值驱动,提升需要一个极其可伸缩的解决方案,最大限度地减少基础设施的数量和“看门人代码”,它需要管理。

与数以百计的伙伴和数据源,提升利用bob体育外网下载其核心数据管道的集成驱动的见解和操作,比如:

  • 漏斗指标——应用程序,批准率、使用利率、转化率,交易量。
  • 用户指标——重复用户率、总活跃用户、新用户,生产速度,横跨海峡的购物。
  • 合伙人报告——在合作伙伴级别漏斗和收入指标。
  • 资金——合格标准、指标和监测资金资产。
  • 支付——授权批准率,重试成功率。
  • 贷款——滚,犯罪监控、复苏信贷/欺诈批准漏斗。
  • 客户支持——呼叫中心统计,队列监控、付款门户活动漏斗。

为了达到这个目标,提高杠杆砖Lakehouse平台bob体育客户端下载构建一个健壮的数据集成系统,很容易吸入和协调数以百计的主题从卡夫卡和S3对象存储。虽然每个数据源是分开存储,自动发现和吸收新来源从应用程序工程团队(数据生产者),为每个数据源和数据独立发展可用下游分析团队。

标准化lakehouse平台之前,添加新数据源和跨团队沟通更改手册,容易出错,且bob体育客户端下载耗时因为每个新源需要新写入的数据管道。使用三角洲生活表,他们的系统已经成为可伸缩、自动无功变化,且可配置的,因此时间洞察力更快通过降低笔记本的数量(从100 + 2管道)开发、管理和协调。


对于这个数据集成管道,隆起有以下要求:

  1. 提供可伸缩的能力摄取100 +主题从卡夫卡/ S3 Lakehouse,三角洲湖基础,分析师可以利用它的原始状态表中的格式。
  2. 提供一个灵活的层动态创建一个表新卡夫卡的话题,可以到达任何地方。这允许容易新的数据发现和探索。
  3. 自动更新模式变化为每个主题从卡夫卡作为数据更改。
  4. 为下游层可配置提供显式规则如表模式执行,数据质量预期,数据类型映射,默认值等,以确保产品化表正常管理。
  5. 确保数据管道可以处理化合物1型更新所有明确的配置表。
  6. 允许下游应用程序创建聚合汇总统计和趋势。

这些需求作为合适的用例设计模式“复用”。多路复用时使用的一组独立的溪流所有共享相同的源在这个例子中,我们有一个消息队列和一系列S3 bucket卡夫卡100年代改变事件的原始数据被插入到一个三角洲表,我们想摄取并行和解析。

注意,多路复用是一个复杂的流设计模式有不同的权衡创建一对一源目标流的典型模式。如果多路复用是你正在考虑,但还没有实施,这将有助于开始在这里这让生产视频流,涵盖了许多最佳实践基本流,以及实现这个设计模式的权衡。

让我们回顾一下这个用例,利用两个通用解决方案大奖章架构使用三角洲湖。这是一个基本的框架,支持以下两种解决方案。


多路复用的解决方案:

  • 火花结构化流砖许多流使用foreachBatch方法使用一个。这个解决方案表读取青铜阶段,将单流分为micro-batch内部的多个表。
  • 砖三角洲的生活表(DLT)是用于创建和管理所有并行流。这个过程使用单个输入表动态识别所有独特的主题在铜表和为每个生成独立的溪流,而不需要显式地为每个主题编写代码和管理检查点。

*本文其余部分假设您有接触火花结构化流和介绍三角洲生活表

在我们的示例中,δ生活表提供了一种声明性管道,使我们能够提供一个配置的表定义在一个高度灵活的架构管理。DLT可以定义一个数据管道,流,和100年代管理表的一个可配置的管道不丢失表级别的灵活性。例如,一些下游表可能需要每天运行一次,而另一些需要实时分析。所有这一切现在可以管理一个数据管道

在我们深入三角洲生活表(DLT)解决方案,指出现有的解决方案设计是有帮助的砖上使用结构化流火花。

解决方案1:多路复用在砖使用δ+结构化流火花

这个结构化流设计模式的架构如下所示:

多路复用使用δ+火花结构化流在砖结构”height=

在结构化流任务中,流将读取多个主题从卡夫卡,然后在一个流解析表中多个表foreachBatch声明。下面的代码块作为一个例子写多个表在一个流。

df_bronze_stage_1 = spark.readStream。格式(json) .load(<路径>)defwriteMultipleTables(microBatchDf, BatchId):
              df_topic_1 = (microBatchDf过滤器(坳(“主题”)= =点燃(“topic_1”)))
              df_topic_2 = (microBatchDf过滤器(坳(“主题”)= =点燃(“topic_2”)))
              df_topic_3 = (microBatchDf过滤器(坳(“主题”)= =点燃(“topic_3”)))
              df_topic_4 = (microBatchDf过滤器(坳(“主题”)= =点燃(“topic_4”)))
              df_topic_5 = (microBatchDf过滤器(坳(“主题”)= =点燃(“topic_5”)))# # #应用模式# #查找模式注册表,检查每个事件类型的事件是最近注册的模式,注册新模式# # # # #写水槽位置(在microBatch系列)df_topic_1.write。格式(“δ”).mode (“覆盖”).option (“路径”,“/数据/ dlt_blog / bronze_topic_1”).saveAsTable (“bronze_topic_1”)df_topic_2.write。格式(“δ”).option (“mergeSchema”,“真正的”).option (“路径”,“/数据/ dlt_blog / bronze_topic_2”).mode (“覆盖”).saveAsTable (“bronze_topic_2”)df_topic_3.write。格式(“δ”).mode (“覆盖”).option (“路径”,“/数据/ dlt_blog / bronze_topic_3”).saveAsTable (“bronze_topic_3”)df_topic_4.write。格式(“δ”).mode (“覆盖”).option (“路径”,“/数据/ dlt_blog / bronze_topic_4”).saveAsTable (“bronze_topic_4”)df_topic_5.write。格式(“δ”).mode (“覆盖”).option (“路径”,“/数据/ dlt_blog / bronze_topic_5”).saveAsTable (“bronze_topic_5”)返回使用每一批- microBatchMode # # #(df_bronze_stage_1#这是readStream数据帧.writeStream.trigger (availableNow =真正的)# ProcessingTime = 30秒的.option (“checkpointLocation”checkpoint_location).foreachBatch (writeMultipleTables).start ())
              > < /路径

有一些关键设计考虑在火花结构化流媒体解决方案。

在结构化流流一对多表,我们需要使用一个foreachBatch函数,并提供每个microBatch的表里面写函数(参见上面的例子)。这是一个非常强大的设计,但它有一些限制:

  1. 可伸缩性:写一对多表是容易的几个表,但不能扩展表,这将意味着100年代的所有表都写在系列(由于火花代码运行,每个写声明需要完成下一个开始前)在默认情况下,上面的代码示例所示。这将增加显著整体工作运行时为每个表补充道。
  2. 复杂性:写的是硬编码的,也就是说,没有简单的方法来自动发现新的主题和创建表前进的新话题。每次到一个新的数据源,需要一个代码发布。这是一个很大的水池,使管道脆性。这是可能的,但需要大量的开发工作。
  3. 刚性:表可能需要刷新速度不同,有不同的质量预期,和不同的预处理等逻辑分区或数据布局的需要。这需要创建完全独立工作刷新不同组的表。
  4. 效率:这些表会有截然不同的数据量,所以如果他们都使用相同的流媒体集群,集群将会有时间,没有充分利用。负载平衡这些流需要开发工作和更有创造性的解决方案。

总的来说,这个解决方案是有效的,但是,可以解决的挑战,进一步解决方案进一步简化单一的DLT管道


解决方案2:多路复用+疾控中心在Python中使用砖三角洲的生活表

容易满足上面的要求(自动发现新表,并行流处理在一个工作,数据质量的实施,由表模式演化,并执行中心插入在所有表的最后阶段),我们使用三角洲生活表元编程模型在Python中声明并建立所有表为每个阶段并行。

这个解决方案的架构三角洲生活表如下:

多路复用+疾控中心在Python中使用砖三角洲的生活表架构”height=

这是1工作由2来完成任务:

  1. 任务:一个readStream原始数据从卡夫卡的话题青铜阶段1成一个单一的δ表。任务然后创建一个视图的不同主题的流。(你也可以选择使用一个模式注册表显式存储和使用每个主题的有效载荷的模式来解析下一个任务,这个视图可以认为模式注册或使用任何其他模式管理系统)。在本例中,我们仅仅从每一个JSON负载动态推断所有模式为每个主题,并执行数据类型转换下游银阶段。
  2. 任务B:一个三角洲生活表管道流的青铜阶段1使用视图生成的首要任务的配置,然后使用元编程模型来创建青铜第二阶段表视图中的每个主题目前每次触发。

    相同的DLT管道则读取一个显式配置(在这种情况下一个JSON配置)注册“产品化”表实施更严格的数据质量的期望和数据类型。在这个阶段,管道清洗青铜第二阶段表,然后实现申请变更成方法按产品分类的表合并到最终的更新银色的阶段

    最后,黄金阶段从创建聚合银色的阶段代表分析表来摄取报告服务。


多路复用实现步骤+疾控中心在三角洲住表

下面是个体实现步骤建立一个多路复用管道+疾控中心在三角洲住表:

  1. 生青铜阶段1——阅读代码范例的话题从卡夫卡和储蓄青铜阶段1δ表。
  2. 创造独特的主题/事件的看法从青铜阶段1—创建视图。
  3. 扇出单铜第一阶段个人表——青铜第二阶段代码示例(元编程)的观点。
  4. 带铜银阶段第二阶段表——代码示例演示从银配置层元编程模型以及银表管理配置的例子。
  5. 创建黄金总量——三角洲生活中的代码范例表创建完成黄金汇总表。
  6. DLT管道哒G -测试和运行DLT管道从青铜第一阶段到黄金。
  7. DLT管道配置——配置三角洲生活与任何参数表管道,集群定制,和其他所需的配置更改在生产中实施。
  8. 多任务创造就业机会步骤1和步骤2 - 7日(所有一个DLT管道)到一个砖工作,哪里有2串联运行的任务。

步骤1:原始青铜阶段1 -阅读代码范例的话题从卡夫卡和储蓄青铜阶段1δ表。

startingOffsets =“最早”
              卡夫卡= (spark.readStream格式(“卡夫卡”).option (“kafka.bootstrap.servers”kafka_bootstrap_servers_plaintext).option (“订阅”、主题).option (“startingOffsets”startingOffsets).load ())read_stream = (kafka.select(坳(“关键”).cast (“字符串”).alias (“主题”)、坳(“价值”).alias (“有效载荷”)))
              (read_stream.writeStream格式(“δ”).mode (“添加”).option (“checkpointLocation”checkpoint_location).option (“路径”,< option_output_file_path >)saveAsTable (“PreBronzeAllTypes”))
              < / option_output_file_path >

步骤2:创建独特的主题/事件的看法

%sql创建视图如果存在dlt_types_config作为选择截然不同的话题,sub_topic——其他事情,如从注册表模式,从卡夫卡或其他有用的元数据PreBronzeAllTypes;

步骤3:扇出单铜第一阶段个人表

%pythonbronze_tables=spark.read.table (“cody_uplift_dlt_blog.dlt_types_config”)# #截然不同的列表已经成功我们通过视图定义topic_list=[[我[0),我1]]bronze_tables.select(坳(“主题”)、坳(“sub_topic”))。合并(1)。收集())
              打印(topic_list)
进口再保险defgenerate_bronze_tables(话题,sub_topic):topic_clean = re.sub (“/”,“_”re.sub (“-”,“_”、主题)sub_topic_clean = re.sub (“/”,“_”re.sub (“-”,“_”,sub_topic))@dlt.table (name =f”bronze_{topic_clean}_{sub_topic_clean},评论=f”青铜表主题:{topic_clean}sub_topic:{sub_topic_clean})defcreate_call_table():# #现在这是在DLT DAG的开始df = spark.readStream.table (“cody_uplift_dlt_blog.PreBronzeAllTypes”)。过滤器((坳(“主题”)= =主题)&(坳(“sub_topic”)= = sub_topic))# #通过readStream到任何预处理函数返回一个流媒体数据帧df_flat = _flatten (df、主题、sub_topic)返回df_flat
话题,sub_topictopic_list:#打印(f”构建{主题}事件类型{sub_topic}”)sub_topic generate_bronze_tables(主题)

步骤4:将铜银阶段第二阶段表

代码示例演示从银配置层元编程模型以及银表管理配置的例子。”height=

定义DLT函数和表生成青铜第二阶段转换配置

defgenerate_bronze_transformed_tables(source_table、trigger_interval partition_cols zorder_cols column_rename_logic =drop_column_logic =):@dlt.table (name =f”bronze_transformed_{source_table},table_properties = {“质量”:“青铜”,“pipelines.autoOptimize.managed”:“真正的”,“pipelines.autoOptimize.zOrderCols”:zorder_cols,“pipelines.trigger.interval”:trigger_interval})deftransform_bronze_tables():source_delta = dlt.read_stream (source_table)transformed_delta =eval(f”source_delta{column_rename_logic}{drop_column_logic})返回transformed_delta

定义函数来生成银表与疾控中心在三角洲住表

defgenerate_silver_tables(target_table, source_table、merge_keys where_condition、trigger_interval partition_cols, zorder_cols expect_all_or_drop_dict column_rename_logic =drop_column_logic =):
              # # # #定义DLT表这种方式如果我们想地图列@dlt.view (name =f”silver_source_{source_table})@dlt.expect_all_or_drop (expect_all_or_drop_dict)defbuild_source_view():#source_delta = dlt.read_stream (source_table)transformed_delta =eval(f”source_delta{column_rename_logic}{column_rename_logic})返回transformed_delta#返回dlt.read_stream (f”bronze_transformed_ {source_table}”)# # #创建目标表的定义dlt.create_target_table (name = target_table评论=f”干净,合并{target_table},# partition_cols =(“主题”),table_properties = {“质量”:“银”,“pipelines.autoOptimize.managed”:“真正的”,“pipelines.autoOptimize.zOrderCols”:zorder_cols,“pipelines.trigger.interval”:trigger_interval})# #合并dlt.apply_changes (目标= target_table,源=f”silver_source_{source_table},键= merge_keys,# = where_condition, # f”{来源}.Column)坳({目标}.Column)”sequence_by =坳(“时间戳”),任何形式的#主键,自动递增的ID,可以用于身份的事件,或时间戳ignore_null_updates =)返回

让银表配置和通过合并功能

表、配置silver_tables_config.items ():# # # # #构建从配置文件转换查询逻辑# # # # ##所需的格式重命名列result_renamed_columns = []renamed_column, coalesced_columnsconfig.get (“renamed_columns”)[0). items ():renamed_col_result = []范围(0,len(coalesced_columns)):renamed_col_result.append (f”坳('{coalesced_columns[我]}')")result_renamed_columns.append (f”.withColumn ('{renamed_column}”,合并({”、“. join (renamed_col_result)}))”)#删除重命名列result_drop_renamed_columns = []renamed_column, dropped_columnconfig.get (“renamed_columns”)[0). items ():dropped_column:result_drop_renamed_columns.append (f“.drop(坳('{项}'))”)
              # pk NULL检查所需的格式where_conditions = []config.get (“upk”):where_conditions.append (f”{项}不是空”)source_table = config.get (“source_table_name”)upks = config.get (“upk”)# # #表级别属性trigger_interval = config.get (“trigger_interval”)partition_cols = config.get (“partition_columns”)zorder_cols = config.get (“zorder_columns”)column_rename_logic =. join (result_renamed_columns)drop_column_logic =. join (result_drop_renamed_columns)expect_all_or_drop_dict = config.get (“expect_all_or_drop”)打印(f”“目标表:{表}\ n源表:{source_table}\ n:{upks}\ n重命名列:{result_renamed_columns}\ n删除取代列:{renamed_col_result}\ n以下条件:{where_conditions}。\ n列重命名逻辑:{column_rename_logic}\ n删除列逻辑:{drop_column_logic}\ n \ n”“”)# # #做疾病预防控制中心独立于转换generate_silver_tables (target_table =表,source_table = config.get (“source_table_name”),trigger_interval = trigger_interval,partition_cols = partition_cols,zorder_cols = zorder_cols,expect_all_or_drop_dict = expect_all_or_drop_dict,merge_keys = upks,where_condition = where_conditions,column_rename_logic = column_rename_logic,drop_column_logic = drop_column_logic)

第五步:创建黄金总量

创建黄金聚合表

@dlt(的名字=“Funnel_Metrics_By_Day”,table_properties={“质量”:“黄金”})def getFunnelMetricsByDay ():summary_df=(dlt.read (Silver_Finance_Update) .groupBy (date_trunc (“天”坳(“时间戳”)).alias(“日期”)).agg ((坳(“时间戳”)).alias (“DailyFunnelMetrics”)))返回summary_df

第六步:DLT管道DAG -把它结合在一起就形成了以下DLT管道:

DLT管道DAG -测试和运行DLT管道从青铜第一阶段到黄金”height=

第七步:DLT管道配置

{“id”:“c44f3244 b5b6 - 4308 - 5 - c9c1fafd37a金属”,“名称”:“UpliftDLTPipeline”,“存储”:“dbfs: /管道/ c44f3244 b5b6 - 4308 - 5 - c9c1fafd37a金属”,“配置”:{“pipelines.applyChangesPreviewEnabled”:“真正的”},“集群”:【{“标签”:“默认”,“自动定量”:{“min_workers”:1,“max_workers”:5}}),“库”:【{“笔记本”:{“路径”:“/流媒体演示/ UpliftDLTWork / DLT -铜层”}},{“笔记本”:{“路径”:“流/用户/ DataEngineering /演示/ UpliftDLTWork / DLT -银层”}}),“目标”:“uplift_dlt_blog”,“连续”:,“发展”:真正的}

在这个设置配置,在这里您可以设置管道水平参数,云像我实例配置文件配置,集群配置,等等。看到下面的文档可用的DLT配置的完整列表。

第八步:多任务创造就业机会——结合DLT管道和预处理步骤1的工作

多任务创造就业机会——结合DLT管道和预处理步骤一个砖工作,哪里有2串联运行的任务。”height=

δ生活表中,我们可以通过配置独立控制每个表的所有方面的表而不改变管道代码。这简化了管道的变化,大大增加可伸缩性先进的自动伸缩由于并行,提高效率代表。最后,整个100 +所有支持在一个工作表管道所有流基础设施来一个简单的配置进行了抽象,并管理所有支持表的数据质量的管道在一个简单的UI。前三角洲生活表,管理这样一个管道的数据质量和血统将手册,非常耗费时间。

这是一个很好的例子,三角洲的生活方式表简化了数据工程师和工程经验,同时允许数据分析师(您还可以创建DLT管道在所有SQL)来构建复杂的管道,需要数百小时内部建设和管理。

最终,增量住表可以提升专注于提供更聪明和更有效的产品为他们的合作伙伴,而不是争论每个数据源与数千行“看门人代码”。bob体育外网下载

免费试着砖

相关的帖子

看到所有公司博客上的帖子