构建流媒体微服务架构:使用Apache Spark Structured Streaming和Friends

下载幻灯片

随着我们不断突破管道吞吐量和数据服务层的界限,新的方法和技术不断出现,以处理越来越大的工作负载——从用户/行为数据的实时处理和聚合,基于规则/条件的事件和度量流分布,到几乎任何数据管道/沿袭问题。这些工作负载在大多数现代数据平台中是典型的,对所有运营分析系统、数据存储系统、ML / DL等都bob体育客户端下载至关重要。我在很多公司看到的一个常见问题可以归结为一般的数据可靠性问题。主要是由于随着公司的扩展和团队的壮大,处理组件的扩展和迁移。几个系统可以迅速扩展成大量独立的组件和服务层,所有这些组件和服务层都需要在零停机的情况下向上、向下或向外扩展,以满足世界对数据的需求。在这项技术的深入研究中,将建立一个新的心智模型,旨在重新定义如何使用Kafka、谷歌协议缓冲区/ gRPC和Parquet/Delta Lake/Spark结构化流构建大规模的互连服务。在深度挖掘过程中展示的材料是基于在Twilio建立一个大型实时洞察平台时所获得的艰难经验教训,在这个平台上,数据完整性和流容错与我们公司提供的服务一样重要。bob体育客户端下载

点击这里观看更多Spark + AI课程

免费试用Databricks

视频记录

-嘿,谢谢你来参加我的Spark峰会,与Spark Structured Streaming和朋友们一起构建流媒体微服务架构。我叫斯科特·海恩斯,是Twilio公司的高级首席工程师。我试着多谈谈我自己,我的背景,然后,我们就直接进入正题。好吧,就像我之前说的,我在Twilio公司工作。如果你不熟悉Twilio,我们是一家大型通信公司,成立于11、12年前。就像短信和电话一样,我们从那里开始发展,在那里工作了大约4年,总的来说,我在流媒体架构工作了大约10年,所以我从雅虎开始。我认为我所做过的其他一些有趣的事情,比如在Twilio,我通过一个名为voice Insights的项目为语音和视频组带来了流式优先架构,大约四年前,我还领导了Spark办公时间到Twilio,我真的很喜欢分布式系统,在我的整个职业生涯中。好了,今天的议程。我们来看看,这是流媒体架构的大致情况?对于人们之前看到的不同类型的API驱动的架构,这到底有什么不同呢? and then we’re also taking a look at like the actual technology that drives it, so what are protocol buffers, why do I like them? Why do I think you should like them as well? and also what is gRPC? What is RPC? what’s a protocol stream? and then we’re gonna also figure out how this actually fits into the Spark Ecosystem. So it’s kind of the agenda for today and we’re gonna just kinda pop right into it.

让我们先来看看总体情况。放大来看一下架构是什么样子的,首先是流媒体,流媒体微服务架构,我们会从左边一直到右边把它看成一个跨度。

流媒体微服务架构

所以如果你把跨度看作是一种单一的事件流,从客户端到服务器,然后通过整个流后台,这就是这个巨大的流架构中的每一个服务器的样子。但是我们将从字面上关注为关系服务的单个客户机,以及数据如何在流系统中遍历。在第一个方框中,我们看一下gRPC客户机。什么是gRPC客户端,什么是gRPC?什么是HTTP/2?Protobuf说,我们从技术中提炼出真正驱动这个架构的各个组件。我们将会从头到尾讲一遍,来真正理解它是如何工作的。但这是一种非常高层次的概述。我们从gRPC客户端,也就是运行在iOS sdk中的客户端,也可以运行在JavaScript中。这并不重要,它可以是一个运行在Java服务器上的客户端,这个客户端自己连接并对gRPC服务器进行远程过程调用,gRPC服务器通过一个叫做Protobuf的东西与gRPC客户端通信,Protobuf消息现在从这个客户端传递到服务器的方式与你期望看到的普通方法调用的方式是一样的就像SDK这样的客户端。 So it kind of encapsulates this client to server relationship without having to really think about where the server lives, which is really, really cool, plus it’s lightening fast. Aside from there, once everything hits the server, things are, (mumbles) to Kafka we’ll talk a little bit more about Kafka, but I think everybody who’s at Sparks Summit also realizes and knows about Kafka and also the importance of Kafka. but once the prototype is in Kafka across an individual topic, but (mumbles) get picked up through Spark. So then Spark and natively interact with protobuf through something called ScalaPB. And then ScalaPB allows you to actually convert your protobuf messages directly into a data frame or a data set within the actual, within your Spark application. So you have this kind of entire kind of end to end system getting everything into Spark once things are in your actual Spark application, it’s really easy then to interact with anything with either, within the Spark ecosystem, or outside of the Spark ecosystem, just by converting the individual data frames to parquet and dropping it in something like HDFS S3 or wrapping that all with Delta. Alright, so we’re gonna take a look at a little bit further, how does this actually expand? So if you start off with a single kind of SPAN level architecture and the SPAN level architecture allows you to kind of have that client to server relationship, servers and passing an individual message to a topic in Kafka, once something’s in Kafka, then things get really interesting. So you take a look at this architecture. This is basically just a reconfiguration of exactly what we were looking at before. So once things are in HTFS like we take a look at the bottom row, or if things are in Kafka and the top row, there’s the ability to actually pass these messages, whether it’s from the gRPC client to the gRPC server, to Kafka to your Spark application, but you can also pass it back and forth. So you can go back to Hadoop or you can take your parquet data and read it, back from Hadoop again, back to an application. And what we’ve done at Twilio is we’ve used this architecture to, be able to kind of message pass reliably, between the individual streaming Spark applications, without actually the necessity to kind of schedule a bunch of different batch jobs like you’d expect in something like can Airflow, or even just like a crontab, or any different Spark applications. The cool thing too, is that at the very end, once you’re ready to kind of close books on whatever the messag is, that can then go back to gRPC, because gRPC enables bidirectional communication. So you have the ability to also kind of peer on the end result of a Spark job, and then you can, (mumbles) pass it back down to a client once, a specific job has been run through from end to end, we’re gonna take a look at what that actually looks like as we kind of, drive deeper into the individual components of this architecture, but as a high level, it’s literally a message from a client to the server. That’s encapsulated through a remote procedure call, which basically kinda gets rid of that edge, which is, client and server, from there, when things go into Kafka, then, it’s really up to you what you want to do with that. But the kind of end result is that you can really reliably pass data back and forth with compile time guarantees, which is really nice, especially for a system that’s online and running, 24/7, 365.

好,我们现在要讲协议,什么是协议缓冲区,也就是protobuf,协议缓冲区本身。为什么要用呢,如果你不熟悉协议缓冲区,你可能只熟悉JSON CSV之类的。JSON CSV是结构化数据,但并不严格。所以这些类型基本上会随着时间的推移而改变和变异,这会破坏你的下游系统,有一种不好的上游,提交。protobuf很酷的一点是它基本上是编译下来的,它是语言不可知的。如果你想一个可以使用的通用消息格式,比如在这个例子中用来封装用户是什么,用户可以是系统中的任何东西它可以定义你想如何定义用户,但是,这里最大的胜利是你有这种,它是一种语言无关的消息格式它允许你编译成你选择的语言。所以如果你的服务器端语言选择c++可能是Java,但你的客户端库都是用Python或Node.js写的。这允许您来回使用相同的消息。所以你不必担心一堆不同的API,它们试图吸收,用JSON写的API,这些API可能会改变,因此任何下游的改变基本上都会让其他一切都有问题,一个糟糕的提交,一个糟糕的推,被回滚,然后破坏你的下游系统。协议缓冲区基本上是来自谷歌的一个想法,它的好处是,你可以将它注入到你的任何库中,作为该库的编译版本,资产或资源。 Then it allows you to really do anything that you want to, without having to worry really about how does that data change as long as you’re abiding by some of the guidelines. So I would recommend taking a look more protocol buffers, just take a look at like developer.google.com, and you can find the protocol buffers there as well. And that gives you a really good overview of how to actually use them. But really like the big win and the big takeaway is that you have the ability to compile down and have version, data that will, can flow all the way through your system, across the actual language boundaries. So from Java to Node.JS and back and forth, et cetera. So they’re really cool, we’re gonna take a little bit more look at them.

所以,就像我之前说的,你有一种语言无关的消息类型,它可以被编译成你选择的语言。很酷的一点是,你有能力自动生成,不同的构建器方法或脚手架类。如果你想一个Java构建器,就像我们右边的,这是一个用Scala构建的Java构建器,我们的值更高,val在那里,val user,一种看待这个的方式基本上是说,我想创建一个新用户。现在我有了一个新的构建器,所有的方法,实际上创建了一个无敌的结构,基本上都是为你写的。所以实际上你什么都不用做。这就像是在说,偷懒吧,没关系,只要写消息,编译下来,现在所有的事情都为你完成了,所以你节省了时间,也很准确,事情可以版本化,这真的很好,除此之外,协议缓冲区有自己的序列化和反序列化库。所以你不用担心这个在我的语言中是如何工作的,因为实际的库会中断protobuf和它本身,跨越node。js和Python, Java, Scala等等,它们都有自己的方式来序列化和编组那些对象所以你不用担心这个。所以你可以得到很多,你基本上可以免费得到一个完整的调查系统,它是闪电般的速度和超级优化,这非常棒。

好,现在我们来看看gRPC以及什么是协议流。这是基于,我们目前所见的一切。所以它使用了协议缓冲区,这基本上是第一次看到客户端到服务器的gRPC关系。

gRPC是什么?gRPC依附于protobuf,这是在protobuf中定义服务器契约的一种方式。如果你想到RPC, RPC的思想是有一个远程过程调用。

GRPC

这是什么意思?如果你是一个客户端,你有一个SDK, SDK有一些方法,会在有IO的情况下做一些事情,在客户端如何向服务器发送消息上,这几乎是一个看不见的边界。我们来看一个广告追踪的例子,这是我在雅虎的时候经常做的事情。在我们使用JSON之前,我们会看一下如何使用gRPC。举个例子,如果你有一个运行JavaScript SDK的gRPC客户端,你要跟踪,客户如何与广告互动,比如增长营销之类的,那么gRPC客户端就不用担心如何为服务器组合对象,只需要创建一个protobuf然后通过gRPC客户端发送到服务器。不用担心,你用的是什么类型的HTTP库,它能跨不同版本的JavaScript工作吗?人们使用的是正确的库版本吗?这些都被封装在整个gRPC过程中。gRPC代表通用远程过程调用。我喜欢把它看作谷歌远程过程调用,因为它也是在谷歌创建的。 And it’s used to power a lot of the services that people actually use nowadays, from TensorFlow, to CockroachDB and like across the board from like, from Envoy as well, which is using HTTP/2 and protobuf as a way of doing a reliable proxy. but it’s really, really cool because if you compile it down, you don’t have to worry about writing, scaffolding for a lot of your different, server-side libraries or client-side libraries, given that you can actually still compile it down like you would your protocol buffer to create your scaffold. So your interfaces in Java and vice versa. The other really kind of cool thing too, is as I talked a little bit about before you have the ability to do this bidirectional streaming, and this doesn’t come necessarily out of the box by default, but there’s, Plugable support for it. So if you think about, your server pushing down to your client then your client, responding back and pushing back to the server and having this whole entire communication, that’s one of the things that gRPC actually powers. so it actually takes a lot of the effort out of, how do you do like, how do you do channel-based communication? So for example, say you’re running a different application where a customer has logged in and they want to get say, say it’s stock data, now your server can just push that down for people who are listening to the specific stock say, I don’t know, say it’s some NASDAQ stock of some sort. then you just be able to get those stocks and be able to take a look at that as they’re updating in real time, just by subscribing to a channel with, the stock trackers that you’re actually tracking, which is kind of nice and then the server-side does one thing and then broadcast it across all of the individual, peers within that individual channel. so that’s like the whole bidirectional part. And so be given that, that streaming and you can connect it to your backend, which is also streaming running Spark jobs that are streaming. You have this kind of end to end stream. That’s really kind of opens up the boundaries to a lot more, things that were very complicated in the past and just adds a framework on top of that.

我之前说过,我们会看一个实际的例子,gRPC的例子,我今天想要展示的gRPC的例子,基本上就是广告追踪。我们来看看这条消息是什么样子的?这是我们客户端和服务器之间的接口。我们将看看实际的服务器代码,它接受这些消息,并将它们承认给Kafka,然后将响应发送回gRPC客户端。这是一个简单的例子,它告诉我们,在你的生态系统中,我们不需要付出多大的努力,就能做出一些健壮而强大的东西。这是一种非常聪明的数据工程方法确保所有你想要进入系统的东西都能进入系统和一个定义好的,有版本的数据包。

好的,gRPC定义消息,就像我们之前看到的,在用户的例子中,一切都是基本的,它被称为消息(听不清)message impression。我们会取adId,它应该是一个字符串(听不清)可能会有一个contexttid,比如什么页面或什么类别的页面广告实际上显示在一个潜在的userId或sessionId中。所以你可以跟踪这个回像一个个体,GDPR第一,确保用户标识自己没有的东西,我的社保或别的东西,但它的东西,在你的目标用户是有意义的,可以是一个IDFA或其他东西,像来自世界(喃喃自语)来确定,一种用户非常不喜欢不知道,但你将所有这些信息从一些用户的上下文的概念,回到特定时间的单个广告。这基本上就是广告印象。所以只有四个字段。然后你就会有正常的反应。这是一个状态码,对于个人来说这是一个典型的HTTP状态码。 and, or it can be a status code that you create for your own server as well, to allow you to identify different types of problems or different issues that come up. So you have this kind of relay back and forth. Plus also a message, so if you want to human readable message to respond to your client, telling them why something didn’t work or telling them that something did work, all of that’s possible as well.

好了,我们有了我们的印象我仍然在屏幕的右边有它。我之前说过,基本上,你有protobuf,然后你也有能力将你的服务定义为protobuf。这就是所有这些都发挥作用的地方。因此,我们将其视为服务器定义或服务定义。如果我们有一个clicktrack服务,它正在取一个印象,那么我们所做的就是,看顶部。所以服务器点击跟踪服务。现在我们有了一个命名服务。我们的rpc方法是AdTrack, AdTrack方法,接受一个impression并返回一个响应。如果你仔细想想,基本上,在这个定义中,没有什么可以被误解的因为客户端必须创建一个印象它必须添加adId contexttid userId和timestamp,或者它可以验证说,这是不正确的,因为你添加了不正确的数据。也许它是一个空字符串或类似的东西。 And your response is always gonna be exactly the same response for the compiled version of your client code and for your server code and for the protobuf at a specific version. So if you consider a versioning, like people would do, across the board from like say Maven versioning, or like any kind of versioning and Artifactory then if you’re using some (mumbles) or something else, then if you’re doing like a non breaking change, great, everything will always work. If you’re doing a breaking change, then coordinate that with, downstreams so that when an upstream changes that nothing breaks down the stream, but if you’re following like really strict, API style, best practices, then no matter what you do, everything should be backwards compatible anyways, protobuf works backwards compatible. So potentially if you’re trying something new and you’re Canary testing say a new field in your AdImpression, other services downstream to have an old version of the protocol buffer for your AdImpression, wouldn’t know that they could even add a new message type. And anybody else is interrupting with that AdImpression would have something called an unknown field and they would not have to do anything with it, which is really great, so it allows you to basically opt-in if you want to, for specific messages for different fields are potentially being added, and it’s a really nice way to be able to not break a service, which is in production, just because you wanna try something new. And you’re working on say like a rollout that’s maybe, kind of opt-in with like feature flags. so all of this stuff basically comes out of the box. if you’re following a protobuf and gRPC standards.

很好,假设现在我们有了这个接口关于消息是什么样子的,关于印象的响应是什么样子的,对于我们的响应,我们有一个服务,基本上是AdTrack服务。当我们编译gRPC定义时,我使用它Fitbit编译,Akka Scala gRPC,所有这些所做的基本上是从协议缓冲区中获取个人定义,我们之前看过的。proto文件,一旦它被编译下来,它将为我的点击跟踪服务搭建接口。我们来看看这个类本身。我们有点击跟踪服务实现,它需要一个Akka Materializer我就不详细讲了因为这不是关于Akka的讲座,但它扩展了点击跟踪服务这是我们在RPC契约中定义的。当我们扩展clicktrack服务,然后我们基本上脚手架这个方法,我们所要做的就是,在这一点上我们要做的就是填入空白,AdTrack是什么,我们看一下AdTrack的覆盖定义。我们把gRPC数据,也就是我们的印象。我们知道这正是我们应该接收的AdTrack的in,因为我们在rpc定义中定义了它。所有这些都是在说我们把响应包装在未来。所以在他们未来的某个时刻,除非一切都失败了,否则你会得到客户的回应。这基本上是一种异步契约。 So what we’re looking at right now, if we kinda go through the code is basically just a promise of a response, before we took a look at the, high level kind of big picture of the architecture, we took a look at it from client to server server to Kafka. This is all that is showing. and it’s also a dummy Kafka client, just to kind of show you how it would work. So where we have KafkaService published record KafkaRecord, with the Try, which is a Boolean, it’s gonna always just return true. This would be connected to a real Kafka client new we are doing much more, on your end to do this, but that wouldn’t fit in the example. So Kafka.publish, we’re sending a KafkaRecord, which has a topic of an ads.click.stream. so we’re sending basically just binary data to Kafka. We’ve got a binary topic, and then we’ve also got a binary data payload. So everything’s binary, it’s lightweight. The nice thing is that as we saw before, because the part above comes with the tone serialization library, it’s very quick to be able to serialize to a byte array. So we get basic binary data in, we have the AdTrack data. We can, do what we want prior to actually sending the stuff to Kafka. So say, for example, you want to add, say a server-side only field, to the actual ad impression. You could do that and say it’s a, stamping a timestamp of like, the time that you received that message, with all said and done, once you basically take that AdImpression record and you call toByteArray on it, you now have a ByteArray, so you have now binary data that is versioned and compiled to a specific version of your protobuf, on a specific topic, which is the ads.click.stream. So if this is a successful response and we’re able to publish this to Kafka just the 200, okay, comes back, which would be like a normal kind of HPP code for your client, encapsulating the fact that, you have published, the AdImpression that you wanted to, record, that’s fairly, lightweight. I think all in all, this is about maybe 80 lines of Scala. it could, of course be a lot larger, like in a normal kind of production use case with like validations across the board and everything else, but this gives hopefully like a quick kind of tidbit into, what actually would it take to implement the service and actually go run it.

在右手边,你会看到和之前完全一样的屏幕,但是现在会更深入地了解协议流是什么。所以如果你考虑客户端到服务器的关系,基本上是你的印象派的数据管道,这些数据基本上直接从你的客户端流到你的服务器,从服务器,基本上所有的东西都流入Kafka。在这一点上,所有的东西基本上都进入了流,并花费,这基本上就像进入点,流数据沿袭管道,或任何你在公司拥有的东西。但好的一点是,Kafka中没有随机的原始垃圾数据。在过去,很多人可能会纠结于,垃圾进出系统。所以把gRPC放在你的数据沿袭管道,或者任何你公司的可靠数据管道前面的真正好处是,这允许你有实际的数据契约,从客户端到服务器,从服务器到Kafka。在这一点上,考虑到protobuf有能力,在服务器端被验证,没有垃圾会进入系统,如果没有不好的东西进入系统,它们不必在下游防御,只要你遵守真正好的最佳实践。这省去了很多防御性的编程,这使得所有事情都更简单了,因为你不需要首先处理数据和删除垃圾数据。你可以在任何事情真正击中Kafka之前这样做,这会加速你整个系统的下行。另一个好处是,你可以用它来做任何你能想到的事情。 So in the abstract use case, they will all, they can all go into, real time personalization, and predictive analytics for what, to show next for the ads based on how people have actually interacted with this, really in real time, so, creating really whatever you want, it’s all at your fingertips.

这就是我们到目前为止看到的组件,在gRPC架构中。我们讲过服务,我们讲过AdTrack和我们的trackedAd,那是,它所做的基本上是在他们的服务器上运行一个远程过程调用,gRPC服务器,它有那个点击跟踪服务实现,它有那个trackedAd的AdTrack方法。这是一个二进制消息,它被作为二进制数据读取到服务器端,它占用的空间很小,不像庞大的JSON有效负载,所有这些都可以很快被验证,因为这些字段都是预期的。然后可以进入我们的ad。click。流。ad。click。stream本身还是protobuf,还是二进制的。所以这和我们看到的从gRPC客户端到GRP服务器没有什么不同。我们并没有真正改变任何东西。所以如果没有突变,事情就会发展得非常快。所以在这一点上,从gRPC服务器到Kafka没有什么变化。 So it’s just binary validated, structured data, which is really, really good as a, kind of a start to your whole data lineage pipeline. And especially like in Spark, it’s nice to not have to worry about being overly defensive with your streaming data because streaming systems can go down at any time.

构造协议流:

现在我们要讲的是,如何在整个概念中,裹上一层漂亮的外壳。鉴于Spark峰会,鉴于人们非常喜欢流媒体,Spark在结构化流媒体方面做得非常好。我们谈论结构化流和protobuf,因为它真的是整个架构的锦上添花,所以就像我之前说过的,一旦所有东西都进入Kafka,我们就有了实际绑定的主题,这是一个协议缓冲区的主题,所以它是一个协议流。结构化协议流。

结构化流本身和protobuf,好处是你可以实际使用,Spark实际上已经通过表达式编码器库内置在内部,能够获取协议缓冲区,在这种情况下,它是通过ScalaPB编译的Scala示例,ScalaPB是一个编译库,它使用带有缓冲区定义和(听不清)case类的协议。

与Protobuf结构化流

假设一个case类在Scala中扩展了product而Spark使用product编码器运行,你实际上可以隐式地引入,任何类型的product编码器通过调用Spark session,你基本上是从那里导入的。或者你可以显式地生成这个隐式导入。所以你必须担心在运行时做这件事,你确切地知道你期待什么。这里我有隐式val at expression和encoder adImpressionEncoder,它接受一个AdImpression的编码器。它所做的就是包装编码器。该产品随Spark免费发布。这能让你通过ScalaPB与Apache Spark中的protobuf实现本机互操作性,然后它还允许你直接处理来自Kafka的数据,从客户端到服务器,再到Kafka,然后进入你实际的Spark应用程序。从这里,你可以对单个用户进行聚合。你可以为机器学习准备特征,通过你的机器学习管道发送给机器学习。然后,一切都可以滚回你的广告服务器,它也可以是GPS, gRPC服务器,所以它真的把整个流媒体系统的点连接起来,以一种非常好的方式,一切都是预期的,就像对事情如何实际工作的预期。

所以,就像我之前说的,母语更好。因此,使用严格的原生Kafka数据帧转换,就不需要转换中间类型。因此,如果你曾经使用过Spark不支持的类型,很多时候你会做的是做一个转换方法,从Kafka中获取某种二进制数据,然后你把你自己的序列化器或反序列化器混合在一起。然后从那里,你必须选择只使用RDD API,或者你必须实际上写你自己的那种复杂的表达式编码器。早在2018年,我参加了一个Spark峰会,谈到了做流媒体趋势和发现。在那个代码库中,实际上有一个如何做的例子,如何把你自己的原生buf编码器带到Spark中,但在这种情况下,我们只看ScalaPB因为它简单得多。在右边,我所拥有的基本上是,这是一个结构化流作业的例子,它带来了AdTrack信息,它像一个实际的机器学习模型一样运行它。如果你看一下右手边,我们有我们的查询,这是我们的流查询,基本上是我们Kafka数据的输入流。我们已经加载了我们的数据,现在我们只是把数据转换成一个印记,然后我们只是通过一个叫做contentWithCategories的东西用一个adId来连接它。contentwithwategcategories是一个ETL过程,基于adId来获得更多关于广告中封装的信息。 From there, we have all the information we would need to transform this data across our serialized, pieline. So there’s going to be a lot of other talks probably, the past few days of the Spark Summit, they’ve talked all about how to actually create like a pipeline. And how do you actually, load and save, pipeline models. And then how do you also say like a logistic regression model, linear regression model, et cetera, so this use case is actually kind of just showing what it would look like from McDonald’s like the data engineering or like machine learning engineering point of view, once those models and pipelines have been serialized off when you’re bringing them in. So it’s all in stream, so join on adId, transform to basically run our pipeline transformation that we’re going to go and basically just predict. So in the case of say like, ad serving, we want to predict the next best ad for this individual user. and then from there, we’re just gonna go write this out as parquet, and we’re gonna dump that into HTFS or Delta and allow, another system to pick that up, which can also do that in real time. So at this point, once everything hits Kafka once it’s been processed and once it’s been, it’s a predicted label has been applied to that data. Now I can get picked back up, you have this kind of full end to end cycle, which is basically, serving your ads, and the other nice thing too, is that given that there’s really nothing that should change, as long as best practices are enforced, then you can actually you can rest at night knowing that the pipelines are actually safe. And that’s, I think one of the biggest things that people worry about, especially from a data engineering point of view is that, a small mutation creates a butterfly effect that can take down an entire ad server. and to speak on ads, ads are money. so that’s something that you don’t actually really wanna do. So if you put in the right kind of safety precautions on the whole entire system, you don’t have to really worry so much about that and it’s really good for everybody, who’s also like in the downstream path as well.

关于protobuf的结构化流的另一个好处是你也可以使用你的protobuf定义。退一步说,如果你有一个protobuf,也就是输入格式和输出格式,你可以使用这两个protobuf对象,来验证严格的输出类型。Delta已经通过版本控制做到了这一点,底层结构类型被存储在Delta Lake中,如果你还不使用Delta,你使用的是类似HDFS的东西,那么这就更重要了,因为随着时间的推移,改变拼花数据的格式并不容易。我们在Twilio的团队中发现的一件事是,如果我们有一个定义可以写出来,比如DDL,它是数据帧模式的一部分,那么就很容易把数据读入,然后验证。在右上角,我有这个方法,def validateOutput接受数据帧,返回一个布尔值,DF schema to DDL等于我们的,报告结构类型DDL,结构类型DDL基本上是为每一个版本控制的,这段实际的Spark代码从那里开始,如果任何东西发生了变化,那么它就无效了。一旦它无效,我们就会记录它为无效,或者很可能我就会推送数据到Datadog或者其他监控服务让我们知道管道中发生了故障。但事实上,这是一个更好,为此在gRPC级别这什么这是无效的实际进入管道,否则你必须防守的,但一旦你有一个火花正在运行的应用程序,这是说只有拼花的数据交换,然后拼花做的本身,它可以转化为你的数据帧,仍然可以验证,Protobuf或者一个struct类型本身。所以我认为真正的一大关键是只要你确切地知道什么是在您的系统输入和输出,这一点很容易创建的东西,从外面看起来很复杂,但非常坚实,从运营的角度来看,一旦它是启动和运行,只要你坚持严格的最佳实践,但往往更容易写,到包装库,执行,某种严格的数据政策,这是非常重要的。好了,现在我们来看一个真实的用例,具体的例子来自我的团队,Twilio的语音洞察团队的一个Close of Book作业。 so what we do is we have a job that runs, pretty much at the end of every single day, just revalidating all the data that was coming in through our streaming pipeline was actually accurate. And the nice thing about this is that we have this kind of this known kind of notion of a Close of Books and all of this data can actually be written out in to kind of a final output stream, which can then be reused to run machine learning jobs and everything else against. So we take a look at the method on the top right we have a run method, run method basically creates a stream inquiry. And this is taking our call summary data. So I work on the voice insights team. We have CDRs, which are called summaries and that call summary itself, is basically just loaded from, our parquet stream. And the parquet stream itself is then, just kind of compiled down, it’s a deduplicated and we pushed that back into a final, final stream back into HTFS. So it’s just an example of how to, how to do that. And, it’s giving you an idea of how we’ve actually wrapped this into our data engineering pipelines. So that’d be kind of interesting to show this after showing like the AdTrack and some of the other kind of example use cases to show what is a real world use case look like? And it’s actually really fairly trivial and fairly simple to actually take a look at that. So, I’m going to show that, so what we’ve looked at today, is kind of the end to end gRPC pipeline to Kafka, to Spark and then final output to parquet and other Hadoop (mumbles) and I really believe that this pipeline, works really well, especially for, as things get more and more complicated.

流媒体微服务架构

所以很多人会开始,你可能有一个单一的服务,可能有一个服务器,两个或三个Kafka主题,可能有一个Spark应用程序,当你把越来越多的数据添加到这种类似的系统中时,如果你遵循基本上把所有东西都看作是一个时间跨度的话。从你的客户端方法请求到服务器,从服务器到Kafka主题,从Kafka主题到一个Spark应用程序或者很多Spark应用程序,就像我们之前看到的那样,最后变成一个真理源存储,像一个Hadoop或者Delta Lake运行在Hadoop上,这是parquet格式。这为你提供了一种可靠的渠道,可以继续(听不清)扩展并制作更复杂的数据谱系,管道,或整个类型的数据谱系服务,就像在你的公司一样,同时看着一些像我们现在看到的一样简单的东西。现在我们来简单回顾一下。

回顾一下,我们看了一下protobuf回顾一下protobuf是一种语言无关的方式通过创建通用的消息格式来构建数据。它们有编译时间保证,这意味着一旦它被编译,它就不能因为被编译的版本而发生变化。它还为表带来了快速的序列化和反序列化,这对于将数据从gRPC客户机发送到gRPC服务器非常有用。然后从gRPC服务器到Kafka,作为一种二进制消息代理到Kafka,我之前说过,gRPC也是语言不可知的。所以它遵循了Protobuf的synic属性,你可以创建一种通用格式,你可以把它编译成你选择的语言。考虑到它在HTTP/2上运行二进制数据,它的延迟也非常低,它也有编译时间保证。因此,对于实际编译的版本,API不会发生变化,这与protobuf提供的功能是一致的。所以编译时间保证是在你的消息格式上,在你实际的服务器中断层上,加上它是一个超级智能的框架,它有很多动力。我们也研究了卡夫卡,我没有深入研究卡夫卡,因为每个人都有过Spark Summit,我保证知道卡夫卡是什么,但作为一个超级快速的回顾,它是高度可用的,它有一个原生的Spark连接器。因此,基本上不需要努力创建一个连接器,无论是从Kafka生产还是从Kafka消费。 And you can use it to pass records to one or more downstream services, which is really great if you want to test different things, it’d be the end testing. last we took a look at structured streaming and this has been out for a long time now, but it’s a nice way of reliably handling data, end mass, the nice thing is that given that you can do protobuf to data set and data frame, conversions natively by using like the ScalaPB or library of your choice, or you can choose to write your own. it’s also really nice too, because everything can go kind of end to end protobuf then from there, the protobuf to data frame to parquet is also native in Sparks. So it allows you to take all that, all that, everything that you’ve worked hard on and reliably store it so that you don’t have a corrupt data Lake or data warehouse, for a future processing of your, valuable data assets.

就是这样,非常感谢大家来参加今年的Spark峰会。我真的很高兴你们今天能学到更多关于gRPC、protobuf、Kafka和结构化流的知识,并了解一些我的团队在过去四年里所做的事情,就创建一个类似于我今天展示的这个基金的管道而言。

点击这里观看更多Spark + AI课程

免费试用Databricks
«回来
关于Scott Haines

为什么Twilio

Scott Haines是一名全栈工程师,目前专注于实时分析和情报系统。他在Twilio工作,是Voice Insights团队的高级首席软件工程师,在那里他帮助推动了spark的采用,流管道架构,并帮助构建和构建了一个庞大的流和批处理平台。bob体育客户端下载

除了在Voice Insights团队的职位外,他还是Twilio机器学习平台的软件架构师之一,在那里他帮助塑造实验、模型训练和安全集成的未来。bob体育客户端下载

Scott目前在公司范围内运营Spark Office Hours,在那里他为Twilio的工程师和团队提供指导、教程、研讨会和实践培训。在Twilio之前,他曾为Yahoo Games编写后端Java API,以及实时游戏排名/评级引擎(基于Storm),为1000万用户提供个性化推荐和页面浏览。他在雅虎的Flurry Analytics完成了他的任期,在那里他编写了移动警报/通知系统。