并行化的结构化流使用三角洲湖工作

下载幻灯片

我们会解决这个问题的运行流使用砖三角洲湖的工作从另一个角度来看,在检查当前的一些问题我们面临Tubi在运行规律的结构化流。一个快速概述为什么我们从拼花数据文件转换为δ,它解决的问题我们在运行流工作。后将我们的数据集转换为三角洲湖。我们将探索技术,我们可以通过提交多个流媒体集群利用率最大化工作从司机使用scala并行集合来并行运行。我们将讨论技术来编写和实现幂等的任务可以并行。最后,我们将讨论一个高级主题上运行一个平行流回填工作,处理故障和恢复的细微差别。演示使用砖笔记本将显示整个演示。

看更多的火花+人工智能会话

免费试着砖

视频记录

,下午好。这是从tubi奥利弗·刘易斯。

并行化的结构化流工作

我们会讨论并行化的结构化流使用三角洲工作。首先,我们要讲的是tubi是什么?所以tubi广告支持的视频点播服务。我们最大的在线目录的免费内容。就像Netflix,我们的大部分收入是由于内容的建议,让我们的观众个性化体验。我们的推荐引擎。我们连续过程一组大型流数据的客户活动。我将向您展示这些事件是什么样子。但大约每秒40000个请求,总量每天大约8亿条记录。和我们的数据大小,= =每天大约500字节。 So when you’re thinking about these events, we have like a pretty robust architecture to ingest these events. We get all these client events hitting our backend service. Which then sends it to a Kinesis Queue, which then hits our enrichment service. Where they are joined with a lot of datasets. And then those data sets are finally ingested to from a consumer spark. The way we’ve created the job is it’s a streaming job that reads these events and writes it into our data Lake. The initial problems that we had with that was, that our event out here is an immutable event, right? We have the language Symantec is that we have a person watching content. So this helps us make it very extendable. And the way we wanna have is like an expressive vocabulary that describes a user action. So if you just go to another example, if you see this example out here, if you have a person watching content on a platform, and that helps us design easily immutable events that can be changed and tweaked. That will eventually reach our data Lake. And we can generate insight from them. When we have, the pipeline design like this, we have a spark that’s running as a streaming job that has a trigger timestamp. And the trigger kind of has issues with file sizing. We tend to have too many small file sizes. And this was something that we were always facing a problem with. And we needed to find a better way to resolve that. I’ll be showing a bit how Delta really helped us solve that problem. Another problem was Data Deletion process. So anytime we needed to make changes to that data. And we needed to reprocess the data. We would have issues with the streaming jobs or managing the state. And we needed to kind of rerun the entire script all over again. Multiple Streams writing to the same location. This was also not easily possible with the first version of Structured Streaming. That’s just because we could not have the folders being updated at the same time. We had like a hacker way of using separate checkpoints to maintain these different streaming jobs. But it still wasn’t a very straightforward method. And that is something where we started using Delta. And Delta came with its own set of features also, that really helped us make our decision to using Delta. The first thing that it solved was we were able to optimize the table and vacuum all of SD files, without interrupting the streaming jobs. We also did not need to worry about the transaction consistency of Delta. Because long running deletes or changes to history were able to be done without impacting the streaming data coming in. And also as most of you know, with the GDPR and CCPA changes that are being done, for most of the data deletion right now. We were able to support that using Delta just because of its transaction lock capabilities. Another thing that I’ve mentioned in the previous slide about not being able to run multiple jobs in parallel. Because of the spark metadata issues that we had to deal with and consistency issues. We do not need to worry about that anymore, because we get full fledged transaction log capabilities using Delta log. So that was another really good reason for using for the entire transition into Delta.

一个简单的结构化流工作的例子

,这是我们开始我们的展示在实际的细节,因为这是一个示例,我们将使用这个演讲。这是一个实时流会话的工作,这将触发每10分钟。让我们想象一下我们写了这样的标准代码测试它,我们编写所有单元测试和部署到生产环境中。一切都很好。我们是内容和在生产中没有失败,集群大小是正确的。我们能够及时让过程记录实时足够了。但这份工作仍然缺乏一些东西。,业务团队不能够得到任何形式的历史报告,对吧?因为没有为我们的数据集进行比较分析。现在我们需要找出一种方法来回填数据。这实际上是最主要的,我们正在努力解决使用三角洲非常结构化的和支持的方式。 Because this was not possible with the first version of the way we had written our code, but it’s kind of easily supported with Delta. So let’s talk about how we manage that with Delta.

策略来回填

因此,当我们试图策划做回填工作。想到的第一件事是让我们写一个批处理作业,执行回填。然而,写一个批处理作业需要一个漂亮的重大修改状态管理代码。正如我前面所描述的,对吧?喜欢sessionization工作是使用状态进行管理。如果你有重写这个工作到批处理模式操作,它需要我们重写所有的状态管理逻辑到一个单独的批处理模式,支持和逻辑。它还会要求我们保持两个单独的代码库。首先这是我们的一个约束流的工作工作。而不必重写业务逻辑。所以保持流的工作工作。 We would still need to figure out a way in which we can gracefully terminate the job on completion. Because as most of you all know, with streaming jobs. The only way to dominate a job is to receive some kind of a termination signal or some kind of some monitoring signal that will say that “Okay, our job is terminated. We need to kill the job now”. But luckily for us spark gives us a useful method for doing this by using trigger at once. Which will load the entire data frame, run the job and then stop the stream on its own. Without us having to write special logic to terminate the job. There is one small Gotcha Moment out here is as most of you all know: Do not replace the readStream with a read and the writeStream with a write. Because implicitly flatMapGroupWithState is gonna convert this mapGroups. It’s gonna convert it into mapGroup function and you’ll lose state management entirely. So that’s just one thing you need to remember.

在回填大型数据集的问题

有问题在回填大型数据集。例如,如果我们说,开始日期至2016年,简和结束日期,假设当前的时间戳。当我们开始工作。有几个问题,第一个问题是工作可能跑了很长一段时间。阅读很多的传入的数据。和它开始执行逻辑工作失败。

这是我们真的没有想做的事情。因为我的意思是,它真的会讨厌我们?我们不希望工作失败后五到六小时的运行。如果一份工作想失败,你宁愿马上工作失败,竞选失败之前很长一段时间和消耗资源。这是一个我们想要避免的。所以你不想有一个工作,有一个很大批量的大小。我使用这个词批大小因为我想详细说明我们如何定义一个理想的批量大小几分钟。你一定也注意到的第二件事是,我使用状态管理工作的状态,我们使用sessionization。和状态管理不能保持如此大的国家。事实上,这项工作将会失败得更快。 So with that said, we want to think of finding the right batch size.

封装的任务。

所以我们想将数据封装到的东西更容易明智和更容易触发和完成。因此,当我们试图确定一个小批量的大小。我们希望能够将数据封装到一个任务可以被触发,执行和完成。另一个关键字,我们应该能够完成。因为这将使我们能够跟踪工作的进展。由于我们的数据是按日期划分,我们决定,只要保持一个任务的粒度级别的日期。同样重要的是,工作是很重要的,至少在这个任务级别和日期分区级别。这重播相同的工作,不会造成重复输出。另一件事,我们还需要跟踪,如果我们建立一个循环执行工作。如果有一定的失败在一个特定的任务,我们读它,我们不应该像副本或任何类型的失败在最后一批处理运行的输出。 So let’s think about the next thing that we need to talk about, its performance. To make the backfill go any faster. Our immediate intuition is to increase the size of the cluster. So let’s take this example, right? The example on the presentation right now. The image out there is showing 3,886 tasks. And we have 64 cores. I’m just giving you the cluster and the size. It was a 64 node cluster. So it took 64 core cluster, and it takes about 8.2 minutes to run this job. So if you increase this cluster size to have 3,886 cores, we can actually complete this job in about eight seconds. So our intuition that we had at the beginning to increase the size of the cluster was actually correct. However, there are a few caveats to this approach. If we increase the cluster any further, we start getting diminishing returns. And that’s something that I’m also gonna show in the demo in a little bit. But I also wanna talk about how we need to have a job running environment. But also make sure that our cluster size is at the right. Exactly It’s intuitively correct. And the way I like to think about this is like the pizza problem. So, I know it’s a little funny, but when I was trying to imagine this entire thing, I literally thought of the pizza problem. Where you have a group of friends around you and you have four slices of pizza. What we have is spark is really great at utilization . So all the four slice of the pizza are picked up immediately. But we’re still gonna have four hands that are idle. Because there’s just not enough pizza. But if you look at the number of cores, it’s always greater than the task. And you end up having a large cluster that is not being fully utilized. If you look at this legend of this chart, that I’ve zoomed in a little bit. If you look at the CPU is it’s averaging at about 29.9%. Which is about one third of the entire cluster.

如果你看看右边的图表,您还可以看到峰值CPU使用率。这应该是一个好指标你意识到也许正在充分利用集群。原因可能是,他们很多IO操作,就像在现实世界中工作。有很多的IO操作发生。CPU和IO操作。IO绑定。和IO操作有时可能会使用较少数量的核心。所以最终离开很多空闲的CPU核。说了这么多,然后我跑另一个分析计算单个分区。在我们开始看到收益递减什么大小? So this is a very arbitrary line that I drew out here. And you can see that, anything to the left of the red line shows a pretty good decrease in the time taken for the job to complete, right? The rate of decrease is much higher. Whereas everything to the right of the lane, shows a slower slope, a smaller slope in the degrees. And that’s kind of my assumption to show that where there is a diminishing return. So when I was trying to decide a single node, if I could run a backfill on a single partition, I would spin up a cluster of 32 cores. So if you just do the math. If I’m running three dates in parallel. I would want to spin up a cluster size of 96 cores. And we will see later on in the demo also, how this kind of fairs and what are the outputs that we see in brackets.

好吧。让我们继续下一个幻灯片。这是另一个指标和性能。当你想设计一个集群。当你试图设计并行作业你要增加数量的任务。你希望他们增加与集群大小以线性方式。

性能

这意味着在某个阈值,如果你增加班级人数更多。你想确保更多运行并行运行。而不是仅仅拥有一块东西在哪里不会增加性能。

,还有下一张幻灯片,我们说并行回填。

回填并行

所以你必须的大多数已经有上下文,我整个事情并行运行。和我把不重要的东西是分离的业务逻辑代码的执行逻辑代码。当我们花时间和执行逻辑的工作,我们也可以结构工作更详细的性能。我经常听到另一个问题是为什么不使用技术像一个调度程序来处理这些多个作业提交。快速的回答,我能想到的是,你可以建立一个调度程序像努力这些在不同的环境中运行。但是请注意,你需要单独设置监控每一个工作。这是你还需要分别跟踪。而在这种方法中,你可以在相同的上下文中。

实际上这是一个有趣的引用,而不是一个有趣的引用。但是这是一个很好的报价,我从Rob Pike读取。他做了很多工作与Golang编程语言。他指出,“并发不是并行性。“如果我们使用Scala的并行集合,

分析我们的工作。我们写了一个真的并行工作。这并不一定意味着火花将整个循环运行。原因是火花使用默认一个先进先出的调度策略。所以即使并行集合发射这些火花并行工作,但实际上火花调度器不得并行执行这些工作。

所以我们要搬到另一个策略就是利用期货和公平调度器池。默认情况下每个池将得到一个平等的集群,但是每个池内,工作将以先进先出的顺序运行。显然我们可以调整这些参数通过改变调度模式,minShare和体重。根据我们的策略。但在演示的示例中,我们将使用默认值。在这个演示,我将通过一个例子使用flatMapGroupWithState结构化流。我们会经过不同的操作模式,使用批处理并行集合和期货与普通模式。让我向你展示集群,我们将使用运行。第一个case类是输入数据,也就是输入数据。被送到sessionization工作。 The next one is our session state, which has been used by the flatMapGroup at state to maintain the state object. The next one is your output data, which is basically for the imitate records from your sessionization job. There are a few other case classes out here and variables, which I will explain. They’re more like helper variables, which I’ll explain when we use them. The next case class that I wanna explain is actually being used, specifically only because we’re using a windowing function with triggered at once. And that’s because when you do have a window function there could be some sessions that are still active in the state store. And have not been emitted out yet. So at the end of the trigger at once run, you wanna make sure you use the SQLContext to read the checkpoint location and the state folder. So you can read the open sessions that should have been emitted but have not yet been emitted. Because you wanna make sure that they’re added to the output. Just please make a note of that. That’s pretty interesting and important , and there’s a good reason for doing that also.

接下来是你sessionization函数这一个非常标准的函数所示最多的书。那就是如果我出色的状态,你要确保你删除状态,并将它添加到仲裁员,如果状态存在。你想更新状态。或你想要创建一个新的状态如果它不存在。在接下来的步骤将会更新现有的国家与新国家豁免。和更新超时时间戳。下一个函数,我们使用的是运行流查询。运行流查询使用水印。正如我提到的,对吧?我们使用水印的12小时。 So we need to use the state steward writer to flush out, open records. We also grouping by device ID and we’re using flatMapGroupingState with the group state timeout as an event time timeout. And we’ll be parsing the sessionization function to this flatMapGroup at state function. We are also setting the trigger out here as to trigger at once.

下一个函数运行状态存储查询,这实际上是将像一个邮政查询上述功能。使用这个真正的原因是你想确保你添加所有的开放状态的状态存储为每个运行相同的输出位置。好的,现在让我们讨论不同的操作模式,对吧?和第一次的操作模式是你的批处理模式。你可以看到,我遍历三个日期,第一,第二,第三的可能。我们会初始化输出文件夹。这是由于在三角洲最终一致性问题。你想确保你之前更新三角洲日志运行单独的运行。

让我们来谈谈日期内的封装。如果你注意到我使用日期、开始地图。所以我要与整个集合进行迭代的日期,按顺序,为每个人。我将运行一个查询步骤之前,会删除现有的文件夹。如果它已经存在。我将作为第二步运行流查询。然后我将运行一个查询的步骤。我的意思是,所有这三个步骤将运行在每个日期,对吗?所以无论哪个日期扔在这个函数。我要运行这三个查询。 So that also helps you visualize that. Okay, if you notice that there’s a pattern to your steps being run and you wanna encapsulate that into a single function. And then being able to map over it gives you a good amount of control over how to paralyze it. And I’ll show you that in a minute, but let’s check out the output of this run. I’m not gonna run this just for time constraints right now. But if I look at the output of this run. You can see that each run is taking approximately a hundred seconds. And the total run is about 318 seconds. So if you check this in parallel, the first things you’ll notice is that the only change that I made was add .bar to the dates collection. Which changes this collection to a parallel collection. Everything else remains exactly the same. The only thing in the output is that now we can see that each of them are running in the sequential order anymore. They are pretty randomized. And the time taken for each individual run is now much higher than a hundred seconds. It’s about 175 seconds. And that is because we are now looking at sharing the cluster between all the different runs. So it makes sense as to why it’s higher. But if you look at the total time taken for the entire job, its almost more than a hundred seconds faster. And that’s kind of be cruel to the slowest job in the run. So this actually gives you a very quick demo of how you can increase your job performance significantly by running it in parallel. Although there are a few caveats to this approach, which I will discuss in the presentation after the demo. But let’s move on to the next mode of operation. And that is using Futures and Scheduler Pool. So I wanted to quickly give you a example of how we can change the level of parallelism in a concurrent chug. So if you notice out you’re by default, the driver, the amount of parallelism is equal to the number of cores on your driver node. So if my driver node had eight cores, I’m gonna get a parallelism of eight. But if you could change that, right. You could also change and tweak the amount of parallelism that you want in your job. And one way of doing that is by using this new fixed thread pool class or function and setting that to the execution context. So that gives you a good way of changing it. Since at my job, we have about three dates. I just set this to a value of three. Of course, if you’re using much larger clusters, you can change it to a more appropriate parallelism size that you have calculated. Okay. So I’m gonna actually run this cell. So while this is running, I’m going to also give you a few things that I found pretty important. So in this, I’m gonna encapsulate the about task in a Future. And I’m gonna be waiting for the result of the sequence of futures at the end. The other change I made is that I set the scheduler pool to the spark context for each individual run. So each individual run is gonna get its own separate pool. So that helps the run get a good amount of isolation from the other runs. And the cluster is able to split its resources for each of the runs. So while this is running, let’s look at the SparkUI. If you look at the stages right now, you can immediately see that the Fair Scheduler Pool. Has three more new pool names pool zero, pool one and pool two. And the minShare, the pool weight, the scheduling mode, are all defaulted to the default values. I haven’t tweaked them yet, but you can obviously change them.

让我们再次点击这个。这是为什么我想给你看。如果你看一下运行的任务,它们都是32,32和32,这正是我前面所描述的,对吧?我们能够均匀地分配资源。他们把每个运行之间的核心。这是我觉得很有用。如果你有一些工作可能会占用资源。如果你看池的运行在这里零,有点。我可以带你看看另一个演示的一部分。

实际上我想告诉你,这需要一点时间。但实际上我想告诉你,当池零并完成所持有的资源池零32内核,又被重新洗牌之间剩下的工作仍在运行。这样可以帮助你加快完成。一旦某项工作已完成。它不稳定时两个核心而不是采取新的资源可用。我认为这是我发现很好和有用的东西。

实际上,完成我的演示。我希望这是真正有用的信息。并帮助你得到一个公平的理解我们如何提高性能通过最小的调整和修改代码。让我们回到表示我们可以进一步讨论。伟大的,你们只是看到的演示,我们跑96多节点集群。我们有一个很好的了解我们可以运行相同的批处理模式,平行模式和池模式。现在让我们看看在不同的集群性能图表。所以如果你看左边的图,它显示了您,我们看到性能稳定在72核心的并行和游泳池。72除以三,我们实际上得到24,远低于32我们最初的假设。的记得早期假设我问你们与任意红线32。 So in a real world scenario, when you’re running this in parallel we actually get a performance improvement on the 32 cores. Which means that we could have actually spun up this job and seen a really good decent performance. Even with just a 24 node cluster for each partition. So if you multiply that by three, that would be about 72 cores, right? The next thing that we wanna notice is the chart on the right. So the chart on the right shows you performance improvements across the board, but we do get much high performance gains at larger clusters of like 96 and 180. If you notice a difference between running this job on a batch mode versus a parallel mode. The parallel mode shows a significant jump in performance.

咱们继续下一张幻灯片。如果你看这张幻灯片,这是我长大的非常早期的演示。这是故障和恢复处理。所以我们应该能够处理工作中的失败和重试。所以如果你看这里所示的三个任务,对,第一个任务完成。因为我们有这些工作并行运行,我们很容易能够跟踪工作的进展。如果一份工作开始和它失败了,我们不能够轻易地追踪它。所以我们应该有额外的状态管理设置。以便我们能够监控状态和跟踪工作。

说完这些,请看看我们的博客。我们将有一个工作示例GitHub的链接。看看我们的职业页面。如果你有任何进一步的问题,LinkedIn联系我或我的电子邮件地址。

看更多的火花+人工智能会话

免费试着砖
«回来
关于奥利弗•刘易斯

了印第安纳大学计算机科学硕士学位,此后我在湾区科技创业领域的金融、零售、和娱乐。实时的项目我兴奋最因为它有趣和给定信号与系统交互响应。