公司博客上

实时机器学习推理的基础设施设计

通过陈昱

2021年9月1日 公司博客上

分享这篇文章

本文由Headspace高级软件工程师Yu Chen撰写。

顶部空间的其核心产品是iOS、Android和基于网络的应用程序,专注于通过正念、冥想、睡眠、锻炼和专注内容来改善用户的健康和幸福。机器学习(ML)模型是我们用户体验的核心,它为用户提供新的相关、个性化的内容,让用户在他们的终身旅程中养成一致的习惯。

当输入到ML模型的数据可以立即用于即时决策时,通常是最有价值的,但是,传统上,消费者数据在机器学习和数据分析团队利用它之前,会被摄取、转换、持久化并休眠很长一段时间。

找到一种利用用户数据生成实时洞察和决策的方法,意味着像Headspace应用程序这样面向消费者的产品可以极大地缩短端到端用户反馈循环:用户在片刻之前执行的操作可以被整合到产品中,为用户生成更相关、个性化和针对具体情况的内容推荐。

这意味着我们的ML模型可以包含动态特征,这些特征可以在用户一天的过程中更新,甚至可以在单个会话中更新。这些特性的例子包括:

  • 当前会话跳出率的睡眠内容
  • 最近用户搜索词的语义嵌入。例如,如果一个用户最近搜索了“准备大考”,ML模型可以为符合这一目标的聚焦主题冥想分配更多权重。
  • 用户的生物特征数据(例如,如果步数和心率在过去10分钟内增加,我们可以推荐Move或Exercise内容)

考虑到用户体验,Headspace机器学习团队通过将基础设施系统分解为模块化发布、接收器、编制和上菜层。该方法利用了Apache Spark™,结构化数据流,AWS SQS, Lambda和Sagemaker为我们的ML模型提供实时推理能力。

在这篇博文中,我们将从技术上深入探讨我们的架构。在描述了我们对实时推理的需求之后,我们讨论了适应传统离线ML工作流以满足我们需求的挑战。然后,在讨论关键体系结构组件的细节之前,我们将给出体系结构概述。

实时推断需求

为了方便实时推断,个性化用户的内容推荐,我们需要

  • 消化、处理、前进我们的用户在我们的客户端应用程序(iOS, Android, web)上执行的相关事件(操作)
  • 快速计算,存储和获取在线功能(毫秒延迟),丰富了实时推理模型使用的特征集
  • 提供并重新加载实时推理模型以一种同步服务模型与在线功能商店的方式,同时最小化(理想情况下避免)任何停机时间。

我们大致的端到端延迟目标(从用户事件转发到Kinesis流到实时推断预测)是30秒

适应传统ML模型工作流的挑战

上述需求通常是离线模型无法解决(也不需要解决)的问题,这些模型提供日常批处理预测。从ELT / ETL数据管道提取和转换的记录中进行推断的ML模型通常具有原始事件数据的几个小时交货时间.传统上,ML模型的训练和服务工作流将涉及以下步骤,通过每隔几小时或每天运行一次的周期性作业执行:

  • 从上游数据存储中提取相关原始数据:对于Headspace,这涉及到使用Spark SQL从我们的数据工程团队维护的上游数据湖进行查询。
    • 对于实时推断:我们每秒会遇到多达数千个预测请求,因此使用SQL从后端数据库进行查询会带来不可接受的延迟。虽然模型训练需要提取完整的数据集,但实时推断通常涉及同一数据的小的、单个用户子集切片。因此,我们使用AWS Sagemaker在线特征组,它能够以个位数毫秒响应时间获取和写入单个用户特征(图中的第三步).
  • 执行数据预处理(特征工程,特征提取等)使用SQL和Python的混合。
    • 对于实时推断:我们用Sagemaker功能存储组的实时功能丰富了Spark结构化流微批原始事件数据。
  • 训练模型并记录相关实验指标:使用MLflow,我们可以注册模型,然后在Databricks Notebook界面中记录它们在不同实验运行中的性能。
  • 将模型持久化到磁盘:当MLflow记录一个模型时,它使用ML库的本机格式序列化该模型。例如,scikit-learn模型序列化使用泡菜图书馆
  • 对相关推理数据集进行预测:在这种情况下,我们使用新训练的推荐模型为我们的用户群生成新的内容推荐。
  • 坚持将预测提供给用户。这取决于生产中的访问模式,以便将ML预测交付给最终用户。
    • 对于实时推断:我们可以将预测注册到我们的Prediction Service,以便导航到ml支持的选项卡的最终用户可以下拉预测。或者,我们可以将预测转发到另一个SQS队列,该队列将通过推送iOS/Android通知发送内容推荐。
  • 编制:传统的批量推理模型利用像气流这样的工具来安排和协调不同的阶段/步骤。
    • 对于实时推断:我们使用轻量级Lambda函数以适当的消息传递格式解包/打包数据,调用实际的Sagemaker端点并执行任何所需的后处理和持久化。

Headspace的RT ML推断架构的高级概述。

用户通过在Headspace应用程序中执行操作来生成事件,这些事件最终会被转发到我们的Kinesis Streams中,由Spark Structured Streaming进行处理。用户应用程序通过向我们的后端服务发出RESTful HTTP请求来获取接近实时的预测,传递他们的用户id和特征标志来指示要发送回哪种类型的ML建议。体系结构的其他组件将在下面更详细地描述。

发布和服务层:模型训练和部署生命周期

ML模型在Databricks notebook中开发,并通过MLflow实验对核心离线指标进行评估,例如在k处召回推荐系统。Headspace ML团队已经编写了包装器类,扩展了MLflow中的基本Python函数模型类:

这个MLflow上下文管理器允许实验运行(参数和指标)被跟踪并易于查询MLModel.mlflow.start_run ()作为运行:省略数据转换和特征预处理代码(样板代码)...#模型构建lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42#培训lr。fit (train_x train_y)#评估模型性能predicted_traits = lr.predict(test_x)(rmse, mae, r2) = eval_metrics(test_y, predicted_质量)在我们的自定义包装类中包装模型模型= ScikitLearnModel(lr)model.log_params(…)model.log_metrics(…)在ML跟踪服务器中记录运行结果#可选地保存模型工件到对象存储和注册模型(给它一个语义版本)#所以它可以被构建到一个sagemaker服务的Docker映像中model.save(注册=真正的

在github上查看

Headspace ML团队的模型包装类调用MLflow自己的save_model方法来执行大部分实现逻辑,在我们的ML Models S3桶中创建一个目录,其中包含构建MLflow模型Docker映像所需的元数据、依赖项和模型构件:

实时机器学习推理的基础设施设计

然后,我们可以创建一个正式的Github Release,指向我们刚刚保存在S3中的模型。这可以通过CircleCI等CI/CD工具获取,这些工具可以测试和构建MLflow模型映像,这些映像最终会被推送到AWS ECR,并部署到Sagemaker模型端点上。

更新和重新加载实时模型

我们经常重新训练我们的模型,但是在生产中更新实时推理模型是很棘手的。AWS有多种部署模式(逐步推出、金丝雀等),我们可以利用这些模式来更新实际服务的Sagemaker模型。然而,实时模型也需要同步的在线功能商店,考虑到Headspace的用户基础规模,完全更新可能需要30分钟。假设我们不希望每次更新模型图像时都停机,我们需要小心确保我们的特征存储与我们的模型图像同步。

例如,一个将Headspace用户ID映射到用户序列ID的模型,作为协同过滤模型的一部分——我们的特征存储必须包含用户ID到序列ID的最新映射。除非用户群体保持完全静态,否则如果我们只更新模型,我们的用户ID将在推理时映射到陈旧的序列ID,导致模型为随机用户而不是目标用户生成预测。

蓝绿色的架构

为了解决这个问题,我们可以采用蓝绿色体系结构,该体系结构遵循蓝绿色部署的DevOps实践。工作流程如下:

Headspace的RT ML推断架构的高级概述。

  • 维护两个并行的基础设施(在本例中是两个特性存储的副本)。
  • 指定一个作为生产环境(让我们称其为“绿色”环境,开始),并通过Lambda将特性和预测请求路由到它。
  • 每次我们希望更新我们的模型时,我们使用批处理过程/脚本用最新的特性更新补充的基础设施(“蓝色”环境)。更新完成后,将Lambda切换为指向蓝色生产环境的特性/预测。
  • 每次我们想要更新模型(及其相应的特征存储)时都重复这个步骤。

接收层:使用Apache Spark Structured Streaming计划作业摄取事件流

Headspace用户事件动作(登录到应用程序,播放特定内容,更新订阅,搜索内容等)被聚合并转发到Kinesis数据流(图中的第一步).我们利用Databricks之上的Spark结构化流框架从这些Kinesis流中消费。结构化流有几个好处,包括:

  • 同样的杠杆作用统一语言(Python/Scala)和框架(Apache Spark)),由数据科学家、数据工程师和分析师共享,允许多个Headspace团队使用熟悉的Dataset / DataFrame api和抽象来推理用户数据。
  • 允许我们的团队实现自定义微批处理逻辑以满足业务需求。例如,我们可以基于每个用户的自定义事件时间窗口和会话水印逻辑触发和定义微批处理。
  • 附带现有的Databricks基础设施工具极大地减轻了机器学习工程师的基础设施管理负担。这些工具包括预定作业、自动重试、高效的DBU信用定价、处理失败事件的电子邮件通知、内置的Spark Streaming仪表板以及快速自动扩展以满足用户应用程序事件活动峰值的能力。

结构化流媒体应用micro-batching将连续的事件流分解为离散的块,在小的微批数据帧中处理传入事件。

流数据管道必须区分事件时间(当事件实际发生在客户端设备上时)和处理时间(当数据被服务器看到时)。网络分区、客户端缓冲和其他一系列问题都可能导致这两个时间戳之间出现重大差异。结构化流API允许简单的逻辑定制来处理这些差异:

df.withWatermark (“eventTime”“十分钟”) \.groupBy (“标识”窗口“eventTime”“十分钟”“5分钟”))

在github上查看

我们使用以下参数配置结构化流作业:

  • 1最大并发运行
  • 无限的重试
  • 新的计划作业集群(与通用集群不同)

使用计划作业集群显著降低了计算DBU成本,同时还降低了相关基础设施故障的可能性。在故障集群上运行的作业(可能存在缺失/不正确的依赖项、实例概要文件或过载的可用分区)将失败,直到底层集群问题得到解决,但在集群之间分离作业可以防止干扰。

然后,我们将流查询指向从专门配置的Amazon Kinesis流中读取,该流聚合了用户客户端事件(图的第二步).流查询可以使用以下逻辑进行配置:

处理器= RealTimeInferenceProcessor()
              Query = df。writeStream \.option(“checkpointLocation”、“dbfs:/ / pathToYourCheckpoint”)\.foreachBatch processor.process_batch \.outputMode (“添加”) \.start ()

在github上查看

在这里,outputMode定义数据如何写入流接收器的策略,可以取三个值:追加、完成和更新。由于结构化流作业与处理传入事件有关,因此我们选择只附加到处理“新”行。

配置检查点位置以优雅地重新启动失败的流查询是一个好主意,允许“回放”在失败之前恢复处理。

根据业务用例,我们还可以通过将参数设置为来减少延迟processingTime = " 0秒",以尽快启动每个微批处理:

查询df。writeStream \.option(“checkpointLocation”、“dbfs://pathToYourCheckpoint”)\.foreachBatch process_batch \.outputMode \(“追加”)触发(processingTime0秒”)\开始()

在github上查看

此外,我们的Spark结构化流作业集群假设一个特殊的EC2实例概要使用适当的IAM策略与AWS Sagemaker功能组交互,并将消息放入我们的预测作业SQS队列中。

最终,由于每个结构化流作业包含不同的业务逻辑,我们将需要实现不同的微批处理函数,每个微批处理将调用一次。

在我们的例子中,我们实现了aprocess_batch首先计算/更新在线特征的方法AWS Sagemaker功能商店,然后将用户事件转发到作业队列(步骤3):

pyspark.sql.dataframe进口DataFrame作为SparkFrameRealTimeInferenceProcessor处理器):def__init__自我):自我。feature store = initialize_feature store()defprocess_batchself, df: SparkFrame, epoch:str) - - - >没有一个”“”具体实现了流查询的微批处理逻辑。参数:df (SparkFrame):要处理的微批Spark DataFrame。epochID (str):批的标识符。”“”self.feature_store compute_online_features (df)
              forward_micro_batch_to_job_queue (df)

在github上查看

编排层:解耦的事件队列和Lambdas作为特性转换器

头顶空间用户产生事件,我们的实时推理模型在下游消费这些事件,从而提出新的建议。然而,用户事件活动量并不均匀分布。有各种各样的高峰和低谷——我们的用户通常在一天中的特定时间最活跃。

编排层:解耦的事件队列和Lambdas作为特性转换器

放置在SQS预测作业队列中的消息由AWS Lambda函数(图中的步骤4),执行以下步骤:

  • 解包消息并为我们想要推荐的用户获取相应的在线和离线特性(图中的步骤5).例如,我们可以使用用户任期级别、性别和地区等属性来增强事件的基于时间/会话的特性。
  • 执行任何最终的预处理业务逻辑。一个例子是协同过滤模型可用的Headspace用户id到用户序列id的映射。
  • 选择适当的服务Sagemaker模型,并使用输入特性调用它(图中的步骤6).
  • 沿着建议转发到其下游目的地(图中的步骤7).实际位置取决于我们是希望用户下拉内容推荐还是将推荐推送给用户:

:这种方法包括将最终推荐的内容持久化到我们的内部预测服务,该服务负责根据客户端应用程序请求最终为用户提供Headspace应用程序的许多标签的更新个性化内容。下面是一个使用实时推理基础设施的示例实验,允许用户从应用程序的Today选项卡获取个性化推荐:

Headspace应用内模式推送推荐由用户最近搜索睡眠内容触发

推动:该方法涉及将推荐放到另一个SQS队列中,用于推送通知或应用内模式内容推荐。下图是由用户最近搜索睡眠内容触发的应用内模式推送推荐的例子,下图是由用户最近完成内容触发的iOS推送通知:

Headspace应用内模式推送推荐由用户最近搜索睡眠内容触发

在完成一个特定的冥想或执行搜索的几分钟内,这些推送通知可以提供相关的下一个内容,而上下文仍然是用户最关心的。

此外,利用此事件队列允许重试预测作业请求——可以为SQS队列设置一个小的可见超时窗口(10-15秒),以便如果预测作业没有在该时间窗口内完成,则调用另一个Lambda函数重试。

总结

从基础设施和体系结构的角度来看,一个重要的学习是在不同的服务之间设计灵活的移交点——在我们的例子中,是发布层、接收层、协调层和服务层。例如,

  • 结构化流作业发送到预测SQS队列的消息有效负载应该使用什么格式?
  • 在每个Sagemaker模型期望的模型签名和HTTP POST有效负载中有什么?
  • 我们如何同步模型图像和在线特征存储,以便在生产中安全可靠地更新重新训练的模型?

主动解决这些问题将有助于将复杂ML体系结构的各个组件解耦为更小的模块化基础设施集。

Headspace ML团队仍在为这一基础设施推出生产用例,但与其他Headspace项目和行业基准相比,最初的A/B测试和实验已经在内容启动率、内容完完率和直接/总推送打开率方面取得了显著提升。

通过利用能够实时推断的模型,Headspace显著缩短了用户操作和个性化内容推荐之间的端到端交付时间。事件流——最近的搜索,内容开始/退出/暂停,应用内导航操作,甚至生物特征数据——在当前会话中都可以被利用来不断更新我们为用户提供的建议,同时他们仍在与Headspace应用程序交互。

要了解BOB低频彩更多关于Databricks机器学习的信息,请收听2021年数据+人工智能峰会主题演讲获取优秀的概述,并在Databricks ML主页

BOB低频彩了解更多关于Headspace的信息www.headspace.com

免费试用Databricks

相关的帖子

看到所有公司博客上的帖子