一个常见的数据科学物联网(IoT)用例涉及使用来自物联网传感器大军的实时数据训练机器学习模型。有些用例要求每个连接的设备都有自己的单独模型,因为许多基本的机器学习算法通常优于单个复杂模型。我们可以在供应链优化、预测性维护、电动汽车充电、智能家居管理或任何其他用例中看到这一点。问题是:
- 整个物联网数据非常大,任何一台机器都装不下
- 每个设备的数据只适合一台机器
- 每个设备都需要一个单独的模型
- 数据科学团队正在使用sklearn和pandas这样的单节点库来实现,因此他们在分发他们的单机概念证明时需要低摩擦
在本博客中,我们将演示如何通过针对每个物联网设备的两种不同方案来解决这个问题:模型训练和模型评分。
多物联网设备ML解决方案
这是一个典型的大数据问题。天气传感器和车辆等物联网设备产生了大量的数据点。单机解决方案无法解决如此复杂的问题,而且通常不能很好地集成到生产环境中。数据科学团队不想担心他们使用的DataFrame是单机pandas对象还是由Apache Spark分发。还有一件事:我们需要在某个地方记录我们的模型和它们的性能,以便再现、监视和部署。
下面是我们需要解决这个问题的两个模式:
- 模型训练:创建一个函数,将单个设备的数据作为输入。训练模型。记录结果模型和使用的任何评估指标MLflow,一个开源bob下载地址的机器学习生命周期平台bob体育客户端下载
- 模型评分:创建第二个函数,从该设备的MLflow中提取训练好的模型,应用它,并返回预测结果
有了这些抽象,我们只需要将函数转换为熊猫UDF,以便用Spark分发它们。Pandas UDF允许在Spark作业中有效地分发任意Python代码,允许分发其他串行操作。然后,我们将采用单节点解决方案,并使其尴尬地并行。
物联网模型培训
现在让我们仔细看看模型训练。从一些虚拟数据开始。我们有一个连接设备的舰队,每个设备都有一些样本,一些功能,以及我们正在期待预测的标签。与物联网设备通常的情况一样,可以使用Spark来完成特性化步骤,以利用其可伸缩性。
进口pyspark.sql.functions作为fDf =(火花。范围(10000*1000).select (f.col (“id”) .alias (“record_id”), (f.col (“id”) %10) .alias (“device_id”)).withColumn (“feature_1”, f.r兰德()*1).withColumn (“feature_2”, f.r兰德()*2).withColumn (“feature_3”, f.r兰德()*3.).withColumn (“标签”, (f.col (“feature_1”) + f.col(“feature_2”) + f.col(“feature_3”)) + f.rand()))
接下来,我们需要定义训练函数将返回的模式。我们希望返回设备ID、训练中使用的记录数量、到模型的路径和评估指标。
进口pyspark.sql.types作为t
trainReturnSchema = t.StructType([t.StructField (“device_id”t.IntegerType ()),#唯一设备IDt.StructField (“n_used”t.IntegerType ()),培训中使用的记录数量t.StructField (“model_path”t.StringType ()),#指定设备的模型路径t.StructField (mse的t.FloatType ())#度量模型性能])
定义一个Pandas UDF熊猫DataFrame一组数据作为输入,并返回模型元数据作为输出。
进口mlflow进口mlflow.sklearn进口熊猫作为pd从sklearn.ensemble进口RandomForestRegressor从sklearn.metrics进口mean_squared_error@f.pandas_udf (trainReturnSchema functionType = f.PandasUDFType.GROUPED_MAP)deftrain_model(df_pandas):“‘在分组实例上训练sklearn模型“‘#提取元数据Device_id = df_pandas[“device_id”] .iloc [0]N_used = df_pandas.shape[0]Run_id = df_pandas[“run_id”] .iloc [0]#获取运行ID以进行嵌套运行#训练模型X = df_pandas[[“feature_1”,“feature_2”,“feature_3”]]Y = df_pandas[“标签”]rf = RandomForestRegressor()射频。fit(X, y)#评估模型预测(X)Mse = mean_squared_error(y,预测)注意,我们可以添加列车/测试分割#恢复顶级训练与mlflow.start_run (run_id = run_id):#为特定设备创建嵌套运行与mlflow.start_run (run_name =str(device_id),嵌套的=真正的)作为运行:mlflow.sklearn.log_model (rf,str(device_id))mlflow.log_metric (“mse”mse)artifact_uri =f: /{run.info.run_id}/{device_id}"创建一个与上面模式匹配的返回pandas DataFramereturnDF = pd。DataFrame([[device_id, n_used, artifact_uri, mse]],列= [“device_id”,“n_used”,“model_path”,“mse”])返回returnDF
在MLflow中嵌套运行的IoT设备模型日志记录
的MLflow跟踪包允许我们记录机器学习开发过程的不同方面。在我们的例子中,我们将为每个设备创建一个运行(或一次机器学习代码的执行)。我们将使用一个父运行将这些运行聚合在一起。
这还允许我们查看是否有任何单个模型的性能低于其他模型。我们只需要在Pandas UDF中添加日志逻辑,如上所示。即使这段代码将在集群的工作节点上执行,如果我们在开始嵌套运行之前启动父运行,我们仍然能够将这些模型记录在一起。
我们可以只查询MLflow来获得每个模型的URI。相反,从Pandas UDF返回URI只是让整个管道更容易拼接在一起。
并行训练
现在我们只需要应用分组地图熊猫UDF。只要任何给定设备的数据都适合Spark集群的一个节点,我们就可以分发训练。首先运行MLflow父进程,然后使用groupby和apply应用Pandas UDF。
与mlflow.start_run (run_name =“所有设备的培训课程”)作为运行:Run_id = run.info.run_uuid
modeldirectoresdf = (df.withColumn (“run_id”f.lit (run_id))#添加run_id.groupby (“device_id”)苹果(train_model))
combinedDF = (df. join (modelDirectoriesDF =“device_id”, =“左”))
好了!现在已经为每个设备训练和记录了一个模型。
物联网模型评分
现在来打分。这里的优化技巧是确保为每个设备只获取一次模型,从而限制通信开销。然后,我们像在单机上下文中那样应用该模型,并返回记录id及其预测的pandas DataFrame。
applyReturnSchema = t.StructType([t.StructField (“record_id”t.IntegerType ()),t.StructField (“预测”t.FloatType ())])@f.pandas_udf (applyReturnSchema functionType = f.PandasUDFType.GROUPED_MAP)defapply_model(df_pandas):“‘将模型应用于特定设备的数据,表示为pandas DataFrame“‘Model_path = df_pandas[“model_path”] .iloc [0]Input_columns = [“feature_1”,“feature_2”,“feature_3”]X = df_pandas[input_columns]
Model = mlflow.sklearn.load_model(model_path)预测(X)
returnDF = pd。DataFrame ({“record_id”: df_pandas (“record_id”),“预测”:预测})返回returnDFpredictionDF = combinedDF.groupby(“device_id”苹果(apply_model)
注意,在每种情况下,我们都使用了分组映射熊猫UDF。在第一种情况下,我们将一个组作为输入,并为每个设备返回一行(多对一映射)。在本例中,我们将一个组作为输入,并为每行返回一个预测(一对一映射)。分组地图熊猫UDF允许这两种方法。
结论
这样你就有了在物联网设备大军中训练的个性化模型。这支持了这样一种观点,即许多基本模型通常优于一个单一的、更复杂的模型。即使这是普遍情况,也可能有一些个别型号的性能低于平均水平,部分原因是该设备的数据有限或缺失。以下是一些进一步改善的建议:
- 使用训练和评估度量中可用的记录数量,您可以很容易地描绘出表现良好的单个模型和表现较差的模型。您可以使用此信息在单个设备模型和在整个舰队上训练的模型之间切换。
- 您还可以训练一个集成模型,该模型从每个设备的模型中获取预测,从整个舰队的模型中获取预测,以及像评估指标和每个设备的记录数量这样的元数据。这将创建一个最终的预测,以改善表现不佳的单个模型。
开始使用MLflow物联网设备
准备好自己试试了吗?您可以在一个可运行的笔记本上看到本文中使用的完整示例AWS或Azure.
如果您是MLflow的新手,请阅读MLflow快速入门的最新MLflow发行版.有关生产用例,请阅读相关内容在Databricks上管理MLflow.