工程的博客

利用真实世界数据检测高危患者

如何在健康Delta Lake上使用机器学习运行时和MLflow来预测患者疾病
分享这篇文章

随着低成本基因组测序和人工智能医学成像的兴起,人们对精准医疗产生了浓厚的兴趣。在精准医疗领域,我们的目标是利用数据和人工智能为疾病提出最佳治疗方案。虽然精准医疗改善了被诊断患有罕见疾病和癌症的患者的预后,但精准医疗是被动的:患者必须生病才能部署精准医疗。

当我们审视医疗保健支出和结果时,我们有巨大的机会通过以下方式来提高医疗成本和生活质量防止慢性疾病,如糖尿病、心脏病或物质使用障碍。在美国,十分之七的死亡和85%的医疗支出是由慢性疾病引起的,发现了类似的趋势在欧洲而且东南亚.非传染性疾病通常可以通过对患者的教育和解决导致慢性疾病的根本问题来预防。这些问题可能包括潜在的生物风险因素,如已知的导致神经系统疾病的遗传风险,社会经济因素,比如环境污染或缺乏获得健康的途径食品/预防保健,行为风险,如吸烟、饮酒或久坐不动的生活方式

精密的预防重点是利用数据确定有患病风险的患者群体,然后提供降低疾病风险的干预措施。干预措施可能包括一款数字应用程序,可以远程监控高危患者,并提供生活方式和治疗建议加强对疾病状况的监测,或提供补充预防性护理.然而,部署这些干预措施首先取决于确定有风险的患者。

识别风险患者最强大的工具之一是使用真实世界数据(RWD),这个术语统称为医疗保健生态系统生成的数据,例如来自住院、临床实践、药房、医疗保健提供者的电子病历(EMR)和健康记录(EHR),以及越来越多地从基因组学、社交媒体和可穿戴设备等其他来源收集的数据。在我们的上一篇博文我们演示了如何从EHR数据建立临床数据湖。在这篇博客中,我们通过使用Databricks统一数据分析平台来跟踪患者的旅程,并创建一个机器学习模型。bob体育客户端下载使用这个模型,给定患者的病史和人口统计信息,我们可以在给定的时间窗口内评估患者特定疾病的风险。在这个例子中,我们将看看药物滥用,这是一个重要的话题物质使用障碍导致的广泛不良健康后果.通过跟踪我们的模型使用MLflow我们可以很容易地跟踪模型随时间的变化,他补充道对将模型应用于病人护理的过程的信心

在Databricks上使用机器学习进行疾病预测

从电子病历数据预测疾病风险的参考架构

数据准备

为了训练一个模型在给定时间预测风险,我们需要一个数据集,该数据集捕获有关患者的相关人口统计信息(如遇到患者时的年龄、种族等),以及关于患者诊断史的时间序列数据。然后,我们可以使用这些数据来训练一个模型,该模型可以了解影响患者在即将到来的时间段内被诊断出疾病的可能性的诊断和人口风险。

显示从EHR中提取的表的模式和表之间关系的图表。
图1:从EHR中提取的数据模式和表之间的关系

为了训练这个模型,我们可以利用患者的遭遇记录和人口统计信息,就像电子健康记录(EHR)中提供的那样。图1描述了我们将在工作流程中使用的表。这些表格是用我们之前博客里的笔记本.我们将继续从Delta Lake加载遭遇、组织和患者数据(带有模糊的PII信息),并创建一个包含所有患者遭遇以及患者人口统计信息的数据框架。

patient_encounters遇到加入(病人,“病人”])加入(组织、“组织”])显示器(patient_encounters.filter (' reasondescription不是null ') .limit (10))

基于目标条件,我们还选择了一组符合训练数据的患者。也就是说,我们包括病例,通过他们的接触史至少被诊断出患有这种疾病的患者,以及同等数量的对照组,没有任何疾病史的患者。

positive_patientspatient_encounters选择“病人”在哪里较低的(“REASONDESCRIPTION”)。就像(“%{}%”.format (条件))).dropDuplicates ().withColumn (“is_positive”点燃(真正的))negative_patientsall_patients加入(positive_patients“病人”),如何“left_anti”.limit (positive_patients。()).withColumn (“is_positive”点燃())patients_to_studypositive_patients.union (negative_patients)

现在我们将我们的接触范围限制在研究中包括的患者。

qualified_patient_encounters_dfpatient_encounters加入(patients_to_study“病人”])过滤器("DESCRIPTION is not NUll")

现在我们有了感兴趣的记录,下一步是添加特性。在这个预测任务中,除了人口统计信息外,我们还选择了被诊断患有这种疾病或任何已知共存疾病(共病)的总次数和以前的遭遇次数作为给定遭遇的历史背景。

虽然对于大多数疾病,有大量关于共病病症的文献,但我们也可以利用现实世界数据集中的数据来确定与目标病症相关的共病。

comorbid_conditionspositive_patients。加入(patient_encounters,[“病人”])在哪里(坳(“REASONDESCRIPTION”) .isNotNull ()).dropDuplicates ([“病人”“REASONDESCRIPTION”]).groupBy (“REASONDESCRIPTION”).().orderBy (“数”,提升.limit (num_conditions)

在我们的代码中,我们使用笔记本电脑部件指定要包括的共病的数量,以及时间的长度(以天为单位)来查看遭遇。使用MLflow记录这些参数的跟踪API

使用MLflow记录参数,比如我们正在研究的条件。

现在我们需要为每次遭遇添加共病特征。与每一种共病相对应,我们添加一列,表示在过去观察到感兴趣的条件的次数,即。

在给定c的条件下,i / t-w≤i < t,对指示器函数xi,c求和。

在哪里

指标函数xi,c的定义,如果患者在第i时刻被诊断为c病症,xi,c为1,否则为0。

我们分两个步骤添加这些特性。首先,我们定义一个函数,添加共病指标函数(即。x我,c):

def add_comorbidities (qualified_patient_encounters_df comorbidity_list):output_dfqualified_patient_encounters_dfidx0伴随疾病comorbidity_list:output_dfoutput_df.withColumn(“comorbidity_ % d”idx (output_df [“REASONDESCRIPTION”]。就像“%”+伴随疾病(“REASONDESCRIPTION”+“%”))。“int”)).withColumn(“comorbidity_ % d”idx,合并(坳(“comorbidity_ % d”idx),点燃(0)) #替换0.cache ()idx+1返回(output_df)

然后我们将这些指标函数在连续几天内相加使用Spark SQL对窗口函数的强大支持

def add_recent_encounters (encounter_features):lowest_dateencounter_features选择“START_TIME”.orderBy (“START_TIME”.limit (1.withColumnRenamed (“START_TIME”“EARLIEST_TIME”output_dfencounter_features.crossJoin (lowest_date).withColumn(“天”,datediff(坳(“START_TIME”)、坳(“EARLIEST_TIME”))).withColumn(“patient_age”,datediff(坳(“START_TIME”)、坳(“生日”)))wWindow.orderBy (output_df [“天”]).partitionBy (output_df [“病人”]).rangeBetween (-int(num_days),-1comorbidity_idx范围(num_conditions):col_name“recent_ % d”comorbidity_idxoutput_dfoutput_df.withColumn (col_name总和(坳(“comorbidity_ % d”comorbidity_idx))。(w)).withColumn (col_name合并(坳(col_name),点燃(0)))返回(output_df)

在添加共病特征之后,我们需要添加目标变量,它表明患者是否在未来给定的时间窗口(例如当前遭遇后的一个月)被诊断为目标疾病。此操作的逻辑与前一步非常相似,不同之处在于时间窗口涵盖了未来的事件。我们只使用二进制标签,表示我们感兴趣的诊断是否会在未来发生。

def add_label (encounter_features num_days_future):wWindow.orderBy (encounter_features [“天”]).partitionBy (encounter_features [“病人”]).rangeBetween (0num_days_future)output_dfencounter_features.withColumn (“标签”马克斯(坳(“comorbidity_0”))。(w)).withColumn (“标签”合并(坳(“标签”),点燃(0)))返回(output_df)

现在我们将这些特性写入Delta Lake中的特性存储中。为了确保重现性,我们将mlflow实验ID和运行ID作为一列添加到特征存储中.这种方法的优点是我们可以接收更多的数据,我们可以向featurestore中添加新的特性,以便将来可以重复使用。

控制数据的质量问题

在我们继续训练任务之前,我们看一下数据,看看不同的标签在类之间是如何分布的。在二元分类的许多应用中,一个类别可能是罕见的,例如在疾病预测中。这种阶层失衡会对学习过程产生负面影响。在估计过程中,模型倾向于以牺牲罕见事件为代价关注大多数类。此外,评估过程也受到了损害。例如,在一个0/1标签分别分布为95%和%5的不平衡数据集中,一个总是预测0的模型的准确率将为95%。如果标签是不平衡的,那么我们需要应用这是一种常见的技术用于校正不平衡数据。

在我们的训练集中,只有4%的遭遇在疾病诊断之前。

查看我们的训练数据,我们看到(图2)这是一个非常不平衡的数据集:超过95%的观察时间窗口没有显示诊断的证据。为了调整不平衡,我们可以降低控制类的样本或生成合成样本。这个选择取决于数据集的大小和特征的数量。在这个例子中,我们对majority类进行了低采样,以获得一个平衡的数据集。注意,在实践中,您可以选择方法的组合,例如对majority类和also进行下采样在你的训练算法中分配类权重

df1dataset_df.filter (“标签= = 1”n_df1df1。()df2dataset_df.filter (“标签= = 0”)采样(0.9) .limit (n_df1)training_dataset_dfdf1.union (df2)采样(1.0显示器(training_dataset_df.groupBy (“标签”).())

使用采样重新平衡我们的数据集。

模型训练

为了训练模型,我们用人口统计和共病特征的子集来增强我们的条件,对每个观察结果应用标签,并将这些数据传递给下游训练的模型。例如,在这里,我们增加了我们最近诊断的共病与遭遇类(例如,这次预约是为了预防保健还是去急诊室?,)和就诊费用,对于人口统计信息,我们选择种族、性别、Zip和患者就诊时的年龄。

大多数情况下,尽管原始临床数据加起来可能高达tb,但在根据纳入/排除标准执行过滤和限制记录之后,我们最终得到的数据集可以在一台机器上训练。我们可以很容易地将spark数据帧转换为熊猫dataframes并根据选择的任何算法训练一个模型。当使用Databricks ML运行时在美国,我们可以使用各种开放的ML库。

任何机器学习算法都有一组参数(超参数),根据输入参数的不同,分数可能会发生变化。此外,在某些情况下,错误的参数或算法会导致过拟合。为了确保模型性能良好,我们使用hyperparameter调优选择最好的模型架构,然后我们将通过指定从这一步获得的参数来训练最终的模型。

要执行模型调优,我们首先需要对数据进行预处理。在这个数据集中,除了数字特征(例如,最近共病的计数),我们还有我们想要使用的分类人口统计数据。对于分类数据,最好的方法是使用one-hot-encoding.这主要有两个原因:首先,大多数分类器(在这种情况下是逻辑回归)都是基于数字特征的。其次,如果我们简单地将分类变量转换为数值索引,它将在我们的数据中引入序数,这可能会误导分类器:例如,如果我们将州名转换为索引,例如加利福尼亚为5,纽约为23,那么纽约就变得比加利福尼亚“更大”。虽然这反映了按字母顺序排列的列表中每个州名的索引,但在我们的模型上下文中,这种排序并不意味着什么。一次性热编码消除了这种影响。

在这种情况下,预处理步骤不需要任何输入参数,超参数只影响分类器而不影响预处理部分。因此,我们分别执行预处理,然后使用结果数据集进行模型调优:

sklearn.preprocessing进口OneHotEncoder进口numpy作为npdefpre_processtraining_dataset_pdf):X_pdf = training_dataset_pdf.drop (“标签”轴=1y_pdf = training_dataset_pdf [“标签”onehotencoder = onehotencoder (handle_unknown=“忽略”one_hot_model = onehotencode .fit(X_pdf.values)X = one_hot_model.transform (X_pdf)y = y_pdf.values返回(X, y)

接下来,我们要选择模型的最佳参数。对于这种分类,我们使用LogisticRegression弹性净罚.请注意,在应用one-hot编码后,根据所讨论的分类变量的基数,我们最终可以得到许多可以超过样本数量的特征。为了避免这类问题的过拟合,对目标函数施加惩罚。弹性网络正则化的优点是它结合了两种惩罚技术(套索而且岭回归),混合的程度可以由一个变量控制,在超参数调谐。

为了改进该模型,我们使用hyperopt找到最好的参数。此外,我们使用SparkTrialshyperopt模式,以并行执行超参数搜索。这个过程利用Databricks的托管MLflow来自动记录与每个超参数运行对应的参数和度量。为了验证每一组参数,我们使用k-fold交叉验证撇除使用F1的分数作为评估模型的指标。请注意,由于k-fold交叉验证生成多个值,我们选择分数的最小值(最坏的情况),并在使用hyperopt时尽量使其最大化。

预处理功能在训练数据帧中的应用。

数学导入expdef params_to_lr (params):返回“惩罚”“elasticnet”“multi_class”“表达”“random_state”43“n_jobs”-1“规划求解”“传奇”“托尔”经验值(params [“托尔”]), #经验值()这里是因为超参数日志空间“C”经验值(params [“C”]),“l1_ratio”经验值(params [“l1_ratio”])def tune_model (params):mlflow.start_run (run_name“tunning-logistic-regression”、嵌套真正的作为运行:clfLogisticRegression (params_to_lr (params))。fit (X, y)损失-cross_val_score(clf, X, y,n_jobs-1,得分“f1”).最小值()返回“状态”: STATUS_OK,“损失”:损失}

为了改进对空间的搜索,我们在logspace中选择参数网格,并定义一个转换函数,通过超opt对建议的参数进行转换。为了更好地了解这种方法以及为什么我们选择这样定义超参数空间,请查看以下内容这个演讲涵盖了如何在Databricks上管理端到端ML生命周期。

导入fmin, hp, tpe, SparkTrials, STATUS_OKSearch_space = {在这里使用uniform而不是loguniform只是为了让指标在mlflow比较中更好地显示在logspace中“托尔”: hp.uniform (“托尔”,3.0),“C”: hp.uniform (“C”,20),“l1_ratio”: hp.uniform (“l1_ratio”,3.,1),spark_trials = SparkTrials(并行度=2Best_params = fmin(fntune_model空间search_space算法山丘建议max_evals= 32,rstatenp随机RandomState43),试用spark_trials

该测试的结果是基于交叉验证的f1评分评估的最佳参数。

params_to_lr (best_params)46]: {“惩罚”“elasticnet”“multi_class”“表达”“random_state”43“n_jobs”-1“规划求解”“传奇”“托尔”0.06555920596441883“C”0.17868321158011416“l1_ratio”0.27598949120226646

现在让我们看一看MLflow仪表板。MLflow自动将超opt的所有运行分组在一起,我们可以使用各种图来检查每个超参数对损失函数的影响,如图3所示。这对于更好地理解模型的行为和超参数的影响尤为重要。例如,我们注意到较低的C值,即正则化强度的倒数,会导致较高的F1值。

MLflow中模型的平行坐标图。
图3所示。MLflow中模型的平行坐标图。

在找到最优参数组合后,我们用最优超参数训练一个二元分类器,并使用MLflow记录模型。MLflow的模型api可以轻松地将模型存储为稍后在模型评分期间调用的python函数,而不管用于训练的底层库是什么。为了帮助模型的可发现性,我们使用与目标条件相关的名称记录模型(例如,在本例中为“药物过量”)。

进口mlflow.sklearn进口matplotlib.pyplot作为pltsklearn.pipeline进口管道mlflow.models.signature进口infer_signature因为我们希望模型输出概率(风险)而不是预测的标签,所以我们重写了# # mlflow。Pyfun的预测方法:SklearnModelWrappermlflow.pyfunc.PythonModel):def__init__自我,模型):自我。Model =模型def预测Self context model_input):返回self.model.predict_proba (model_input) [:,1def火车参数个数):mlflow.start_run (run_name =“training-logistic-regression”嵌套=真正的作为运行:mlflow.log_params (params_to_lr (params))X_arr = training_dataset_pdf.drop (“标签”轴=1) . valuesy_arr = training_dataset_pdf [“标签”) . valuesohe = OneHotEncoder(handle_unknown=“忽略”clf = LogisticRegression(**params_to_lr(params))。fit (X, y)管道([(在一个炎热的, ohe), (“clf”, clf)))
              Lr_model = pipe。fit (X_arr y_arr)score=cross_val_score(clf, ohe.transform(X_arr), y_arr,n_jobs=-1,得分=“准确性”) .mean ()wrapped_lr_model = SklearnModelWrapper(lr_model)model_name =“- - -”. join (condition.split ())mlflow.log_metric (“准确性”,得分)mlflow.pyfunc.log_model (model_name python_model = wrapped_lr_model)displayHTML (模型精度为: %s '%(分数))返回(mlflow.active_run () . info)

现在,我们可以通过传递从上一步获得的最佳参数来训练模型。

注意,对于模型训练,我们已经将预处理(一个热编码)作为sklearn管道的一部分,并将编码器和分类器记录为一个模型。在下一步中,我们可以简单地调用患者数据模型并评估他们的风险。

模型部署和生产

在训练模型并将其记录到MLflow之后,下一步是使用模型对新数据进行评分。MLflow的特性之一是可以根据不同的标签搜索实验。例如,在本例中,我们使用在模型训练期间指定的运行名来检索已训练模型的工件URI。然后,我们可以根据关键指标对检索到的实验进行排序。

进口mlflowbest_run = mlflow.search_runs (filter_string ="tags.mlflow.runName = 'training- logistics -regression'"order_by = (的指标。准确性DESC”]) .iloc [0model_name =药物过量的clf = mlflow.pyfunc.load_model (model_uri =“% s / % s”% (best_run.artifact_uri model_name))clf_udf = mlflow.pyfunc。spark_udf(火花,model_uri =“% s / % s”% (best_run.artifact_uri model_name))

一旦我们选择了一个特定的模型,我们就可以通过指定模型URI和名称来加载模型:

加载特性。

将MLflow加载模型应用到特征数据框架中。

我们还可以使用Databricks的模型注册表来管理模型版本、生产生命周期和简单的模型服务。

将疾病预测转化为精准预防

在这篇博客中,我们探讨了对精确预防系统的需求,该系统可以识别驱动慢性疾病发作的临床和人口统计学协变量。然后,我们研究了一个端到端机器学习工作流,该工作流使用来自电子病历的模拟数据来识别有药物过量风险的患者。在这个工作流的最后,我们能够导出我们从MLflow训练的ML模型,并将其应用于新的患者数据流。

虽然这个模型提供了信息,但在转化为实践之前,它不会产生影响。在现实世界的实践中,我们已经与许多客户合作,将这些和类似的系统部署到生产中。例如,在南卡罗来纳医科大学,他们能够部署实时流媒体管道,处理电子病历数据,以识别有败血症风险的患者。这可以提前8小时检测到脓毒症相关患者的衰退。在INTEGRIS Health的一个类似系统中,监测EHR数据以发现压疮发展的新迹象。在这两种情况下,只要患者被确认,护理团队就会被提醒他们的病情。在医疗保险环境中,我们与Optum合作部署了类似的模型。他们能够开发一种疾病预测引擎,在长期短期架构中使用循环神经网络来识别疾病进展,并在九个不同的疾病领域具有良好的泛化能力。该模型用于使患者与预防保健途径保持一致,从而改善慢性病患者的结果和护理成本。

虽然我们的大部分博客都专注于在医疗保健环境中使用疾病预测算法,但在制药环境中也有很强的机会构建和部署这些模型。疾病预测模型可以深入了解药物在上市后的使用情况,甚至可以发现以前未被发现的保护作用告知标签扩展工作.此外,疾病预测模型在观察罕见疾病(或其他诊断不足的疾病)的临床试验登记时是有用的.通过创建一个模型,观察在接受罕见疾病诊断之前被误诊的患者,我们可以创建教育材料,教育临床医生常见的误诊模式,并希望创建试验纳入标准,从而增加试验登记人数和提高疗效。

在健康的三角洲湖上进行精确的预防

在这篇博客中,我们演示了如何在真实世界的数据上使用机器学习来识别有患慢性疾病风险的患者。要了解BOB低频彩更多关于使用Delta Lake存储和处理临床数据集,请下载我们的关于使用真实世界临床数据集的免费电子书.您也可以开始免费试用今天使用患者风险评分记录本来自这个博客。

免费试用Databricks

相关的帖子

看到所有工程的博客的帖子