工程的博客

通过闷烧实时燃烧电子健康记录

分享这篇文章

请查看解决方案加速器下载本博客中提到的笔记本。

在以前的博客,我们研究了处理来自电子健康记录(EHR)的患者数据的两个独立工作流程。在这些工作流中,我们主要关注EHR数据的历史批处理提取。然而,在现实世界中,数据不断地输入到EHR中。对于许多重要的预测医疗保健分析用例,例如脓毒症的预测ER过度拥挤在美国,我们需要处理通过电子病历的临床数据。

数据实际上是如何流经电子病历的?通常,有几个细化过程。在一些EHR实现中,数据首先以近乎实时的方式进入NoSQL风格的操作存储。一天之后,这个NoSQL存储中的新数据将从操作存储转移到规范化和多维SQL存储中。其他EHR实现有不同的数据库实现,但在EHR数据的最终“历史”记录可用于批处理分析之前,通常会有一个延迟。为了实时分析临床数据,我们需要访问基于推送的任何一种HL7消息提要或基于FHIR API端点.这些提要和端点包含各种健康信息数据。例如,入院/出院/转院(ADT)消息可用于跟踪患者何时进入病房或在病房之间移动,而订单输入(ORM)消息可下达/修改/取消订单,例如为患者输氧或进行特定的实验室测试。通过将饲料和资源结合在一起,我们可以全面了解我们的病人和我们的医院。

医疗团队和临床医生在构建基于EHR数据的实时分析系统时面临许多问题:我应该使用FHIR还是HL7?我应该如何解析这些记录?我应该如何存储和合并数据?在这篇博客中,我们将介绍Smolder,这是一个使用Apache Spark™操作EHR数据的开源库。Smolder提供Spark原生数据加载器和api,将HL7消息转换为Apache Spark™SQL DataFrames。为了简化消息中的内容操作、验证和重新映射,Smolder添加了用于访问消息字段的SQL函数。最终,这使得构建流管道以在几分钟内摄取和分析HL7数据成为可能,同时提供了易于使用的声明性语法,无需学习像HAPI这样的低级库.这些管道可以实现接近实时的延迟,同时实现跨医疗保健数据源的数据统一通过利用开源标准,比如bob下载地址三角洲湖

流程图显示数据流如何进入三角洲湖,然后进入仪表板

在本博客的其余部分,我们将使用Smolder库来近实时地分析EHR数据,以确定具有高护理利用率的患者。首先,我们将深入研究HL7v2标准它是如何工作的,它意味着什么。然后,我们将讨论指导我们开发Smolder的设计原则。最后,我们将展示如何将数据加载到三角洲湖通过使用Apache Spark的结构化流api和Smolder库。我们将使用这些数据来驱动一个仪表板,帮助我们实时识别在我们的医院系统中哪里有高利用率的患者。

使用HL7消息

HL7v2
HL7代表健康等级7,一个为医疗保健定义互操作性标准的国际标准组织。他们维护多个重要的标准,例如新兴的FHIR医疗保健数据交换标准定义了用于交换医疗保健数据的REST api和JSON模式(合并)临床文献架构(CDA和C-CDA)标准,以及最初的HL7v2标准。这些标准有不同的目的:FHIR定义了对应用程序开发有用的API端点,C-CDA定义了最适合用于交换关于患者的历史信息的标准,HL7v2是一种消息传递方言,用于捕获EHR中出现的状态和数据的实时更新。虽然最近有很多人对FHIR应用程序开发感兴趣,但HL7v2通常是最适合用于分析的标准,因为它被广泛采用。大多数机构目前都有利用HL7提要的遗留系统。因此,更容易挖掘更大量的数据并创建更丰富的分析基础,而不必等待每个系统都支持FHIR。此外,HL7提要本身通常是基于TCP/IP的数据流,这与基于事件的流架构非常吻合。

MSH |^~\&||||| 20201020150800.739 + 0000 | | ADT ^ A03 ^ ADT_A03 | 11374301 | P | 2.4犯| A03 | 20170820074419 - 0500PID|||d40726da-9b7a-49eb-9eeb-e406708bbb60||海勒^肯尼斯||||||140 Pacocha Way Suite 52^^北安普顿^马萨诸塞州^^美国厄尔pv | |的| ^ ^ ^迪金森医院公司动态| | | | | 49318 f80-bd8b-3fc7-a096-ac43088b0c12厄尔^ ^ |||||||||||||||||||||||||||||||||||| 20170820074419 - 05 \00

代码:这是一个示例HL7消息。这是一条ADT_A03消息,它提供了病人出院的信息。我们使用开源的Synthea医疗记录模拟器生成了这条消息。

既然我们已经确定了HL7v2,那么让我们从查看HL7v2消息的内容开始。上图显示了一条HL7v2消息,它是多行、管道分隔的。该规范还有一个XML版本。FHIR是一个基于json的规范。HL7v2规范指定了每种消息类型和段类型的模式。消息的每一行都是一个段,第一列是“段描述符”,它告诉我们该模式在该段中用于什么。当我们解析上面的ADT_A03消息时,我们有两个段;“PID”段标识符包含关于患者身份的信息,“PV1”段包含关于患者出院时的就诊信息。在患者身份段中,第二个字段是患者的ID,第四个字段是姓名,依此类推。

那么,如何处理这些数据呢?我们已经帮助多个客户使用Apache Spark的流功能将HL7数据导入Databricks。从历史上看,我们看到客户在他们的电子病历和流媒体服务之间建立了一个管道Apache卡夫卡™️, Kinesis或EventHubs。然后,他们将其中一个流总线连接到Apache Spark,生成消息文本的DataFrame。最后,他们使用手写的解析器或像HAPI这样的低级库来解析文本。

虽然这种方法对一些客户有效,但不得不手工编写解析库——或者依赖于像HAPI这样有开销的库——可能会带来挑战。数据湖的主要好处之一是能够延迟管道中的验证。我们通常将其称为青铜层,即以原始形式摄取和存储的数据。这使您可以灵活地维护观察到的历史数据,而不必对数据的大小和形状做出任何选择。

当您需要在开始面向业务的用例之前进行历史验证和分析时,这尤其有用。例如,考虑验证初级保健提供者(PCP)字段的情况。如果您发现一个医疗保健系统正在与护理团队中的其他医生交换PCP,那么您将希望能够在所有这些记录中纠正错误。闷烧的设计,赞同这一范式。

与HAPI不同的是,Smolder将消息分解为struct,这些struct只尝试验证有效的HL7消息。这使我们能够捕获观察到的数据,同时仍然可以访问查询和填充我们的银层。

设计Smolder,一个用于在Apache Spark中使用HL7的开源库

我们开始开发Smolder是为了提供一个易于使用的系统,该系统可以实现处理HL7v2消息的近乎实时延迟,并使该数据可用于数据科学和可视化的大型工具生态系统。为此,我们采取了以下方法:

  • 用一行代码将HL7消息转换为dataframe:dataframe在数据科学中广泛使用——无论是通过Pandas、R还是spark——并且可以通过广泛访问的声明性编程框架(如SQL)使用。如果我们可以用一行代码将HL7消息加载到一个DataFrame中,那么我们就极大地增加了可以使用HL7消息的下游位置的数量。
  • 使用简单的声明性api从消息中提取数据:而图书馆哈皮神提供了用于处理HL7v2消息的api,这些api是复杂的、深度面向对象的,并且需要大量关于HL7v2消息格式的知识。如果我们可以为人们提供一行类似sql的函数,他们就可以理解HL7消息中的数据,而不需要学习新的复杂的API。
  • 为HL7消息提供一致的模式和语义,无论其来源是什么:Smolder既支持直接摄取HL7v2消息,也支持摄取来自其他流源(无论是开源工具)的HL7v2消息文本Apache卡夫卡或者云特定服务AWS的动作Azure的EventHubs.无论源是什么,消息总是被解析到相同的模式中。当与Apache Spark的结构化流语义,我们实现了可移植的、与平台无关的代码,这些代bob体育客户端下载码可以很容易地进行验证,并在批处理或流数据处理期间等效地运行。

最终,这种方法使Smolder成为一个易于学习和使用的轻量级库,它可以支持对大量HL7消息要求苛刻的sla。现在,我们将深入研究Smolder的api,以及如何构建一个分析入院模式的仪表板。

使用Smolder解析HL7消息

Apache Spark™的结构化流API允许用户使用Spark SQL api的扩展来处理流数据。当与Smolder库结合使用时,您可以将HL7v2消息加载到DataFrame中,可以使用Smolder读取一批原始HL7v2消息,也可以使用Smolder解析来自其他流源的HL7v2消息文本。例如,如果你有一批消息要加载,你只需调用hl7阅读器:

Scala > val df = spark.read.format(“hl7”) .load (“路径/ / hl7消息”df: org.apache.spark.sql.DataFrame = [message:]字符串段:数组<结构> >)< /结构>

返回的模式包含消息列中的消息头。消息段嵌套在segments列中,这是一个包含两个嵌套字段的数组:段的字符串id(例如,a的PID)患者识别段)和一个段字段数组。

Smolder还可以用于解析原始消息文本。如果HL7消息提要首先到达中间源(例如,Kafka流),则可能会发生这种情况。为此,我们可以使用Smolder的parse_hl7_message helper函数。首先,我们从包含HL7消息文本的DataFrame开始:

scala>val textMessageDf...textMessageDf: org.apache.spark.sql.DataFrame价值字符串):scala>textMessageDf.show ()+--------------------+|价值|+--------------------+|MSH||||||2020...|+--------------------+

然后,我们可以从com. databrks .实验室.smolder.functions对象中导入parse_hl7_message消息,并将其应用到我们想要解析的列上:

scala >进口com.databricks.labs.smolder.functions.parse_hl7_message进口com.databricks.labs.smolder.functions.parse_hl7_messagescala> val parsedDf = textMessageDf.select(parse_hl7_message($“价值”).作为“消息”))parsedDf: org.apache.spark.sql.DataFrame = [message: struct>>>]

这将产生与hl7数据源相同的模式。

使用Smolder从HL7v2消息段中提取数据

虽然Smolder为HL7消息提供了易于使用的模式,但我们还在com.databricks.实验室. Smolder .functions中提供了帮助函数,以提取消息段的子字段。例如,假设我们想要获得病人的名字,它是病人ID (PID)段中的第5个字段。我们可以用segment_field函数来提取它:

scala>进口com.databricks.labs.smolder.functions.segment_field进口com.databricks.labs.smolder.functions.segment_fieldscala>val nameDfdf.select (segment_field(“PID”,4) .alias(“名字”))nameDf: org.apache.spark.sql.DataFrame(名称:字符串)scala>nameDf.show ()+-------------+|的名字|+-------------+|海勒Keneth|+-------------+

如果我们想要得到病人的名字,我们可以使用子字段函数:

scala>进口com.databricks.labs.smolder.functions.subfield进口com.databricks.labs.smolder.functions.subfieldscala>val firstNameDfnameDf.select(子域(“名字”美元,1) .alias(“firstname”))firstNameDf: org.apache.spark.sql.DataFrame(firstname:字符串)scala>firstNameDf.show ()+---------+|firstname|+---------+|Keneth|+---------+
  • 青铜层包含原始消息提要(例如,ADT、ORM、ORU等的每个提要的一个表)
  • 银层将此信息聚合到对下游应用程序有用的表中(例如,a患者纵向记录,关于医院资源的汇总)
  • 黄金层包含应用程序级数据(例如,对于医院过度拥挤警报系统,每个医院的每个病房的占用率)

为什么要建在三角洲湖上?首先,Delta是一种开放的格式,可以确保从许多分析系统中轻松访问数据,无论是通过Apache Spark还是通过Apache Spark作为数据科学生态系统数据仓库系统,比如Synapse.此外,Delta Lake设计为支持级联流,这意味着数据可以从青铜层流到银层,最后流到金层。此外,三角洲湖提供了许多方法优化我们的表以提高查询性能.例如,我们可能希望快速查询患者ID和就诊日期,因为这些是常用的查询字段:正如我们在之前的博客中讨论的那样,我们可以使用z排序。Delta Lake支持z排序来进行多维数据聚类,并在这两种查询模式上提供良好的性能。

用闷烧开始建造一个生命值三角洲湖

在这篇博客中,我们介绍了闷烧,一个Apache 2授权的库,用于从EHR加载患者数据。你可以从…开始阅读我们的项目文档,或创建存储库的一个分支今天就开始贡献代码。要了解BOB低频彩更多关于使用Delta Lake存储和处理临床数据集,请下载我们的免费电子书的工作与现实世界的临床数据集.你也可以从今天开始使用这个笔记本的免费试用解决方案加速器

免费试用Databricks
看到所有工程的博客的帖子