投资数据平台现代化战略bob体育客户端下载
2021年1月29日 在工程的博客
2020年,个人和机构投资者的投资胃口都达到了历史高点。一项研究表明“在新冠肺炎引发的波动之后,散户交易员占股市的近25%”.此外,机构投资者对加密货币进行了大量投资,其中36%投资于加密货币,正如《商业内幕》所概述的那样.随着投资者获得和交易加密货币等替代资产,交易量飙升,并带来了新的数据挑战。此外,尖端研究已不再局限于华尔街的机构投资者——如今的投资世界已扩展到硅谷的数字交易所、以数据为中心的做市商,以及越来越多地投资于为投资者提供人工智能工具的零售经纪商。数据湖已经成为构建金融数据产品和研究的标准,但它们也面临着一系列独特的挑战:
- 缺乏如何在云端建立企业数据湖的蓝图
- 组织仍在努力保证其数据的可靠性和及时性,导致次优流程和稀释的见解
因此,由于维护成本高,以及缺乏规模和交易盈利能力的蓝图,难以实现可扩展的人工智能(例如波动率预测)。作为我们建议的蓝图的一部分,我们建议在Delta Lake上进行标准化,这是一个开源存储层,它将ACID事务带到Apache Spark™和大数据工作负载。Delta Lake中的表既是批处理表,也是流源和流汇。流式数据摄取、批处理历史回填和交互式查询都可以开箱即用。特别是,由于原始市场数据是实时交付的,必须近实时地使用来支持交易决策,因此Delta Lake对于支持交易用例至关重要。
这个博客有两个主要部分。第一份报告详细介绍了将金融市场数据输入Delta Lake的方案。第二部分涵盖了产品化用例的蓝图,例如金融产品波动预测以及Delta Lake的市场监视。值得注意的是,作为用例的一部分,我们介绍了一个由Databricks Labs开发的开源时间序列包,它有助于为上面的用例构建基础。
如何打造市场三角洲湖
在本博客中,通过一系列设计模式和实际示例,我们将解决上一节中的数据挑战。作为一个视觉指南,下面的参考体系结构将是如何构建数据湖源和为最终报告、交易摘要和市场监控警报策划数据集的基础。
基本数据源摄取
基本数据(位于图1的左上方)被宽泛地定义为用于衡量公司内在价值的经济和财务因素,目前可以从大多数财务数据供应商获得。最常见的两种来源包括Factset而且标准普尔市场情报平台bob体育客户端下载.这两个源都可以通过FTP、API和SQL Server数据库提供数据。由于数据可以通过数据库进行因子分析,因此有三个简单的选择可以摄入到三角洲湖:
选项1 -合作伙伴摄入网络
Databricks与六家公司合作,组成了“合作伙伴数据摄取网络”。bob体育外网下载我们的合bob体育外网下载作伙伴能够从各种来源摄取数据,包括FTP、crm、营销源和数据库源。由于金融供应商允许金融客户托管数据库,因此可以使用我们的合作伙伴工具提取数据,直接存储在Delta Lake中。关于如何使用该网络摄入的完整文档和合作伙伴的列表位于Databricks的文档中,bob体育外网下载合作伙伴数据集成.
选项2 -使用原生的基于云的摄取工具
云服务提供商也有将数据库复制到Delta Lake的现有工具。下面是从数据库(本地或云)摄取到Delta Lake的两个选项。
AWS
AWS提供了一种解决方案,即数据库迁移服务(Database Migration Services),它允许组织建立变更数据捕获(CDC)流程,将数据库更改复制到云数据湖。我们在博客中概述了将数据库更改复制到Delta Lake的具体方法,“使用AWS DMS将事务性数据迁移到Delta Lake。”例如,由于Xpressfeed标准普尔数据有数百个来源,从ESG风险评分和替代数据到基本收益和新闻情绪数据集,因此将这些数据自动复制到Delta Lake的方法至关重要。上面提到的AWS解决方案提供了一种简单的设置方法。
Azure
Azure最受欢迎的服务之一是Azure数据工厂(ADF),这是有充分理由的。ADF允许从许多不同的数据源进行复制,包括数据库、FTP甚至跨云数据源(如BigQuery)。具体来说,有两种方法可以将数据从SQL数据库写入Delta Lake:
- ADF提供了一个简单的”复制到的工厂它只是简单地将数据库表复制到blob存储(blob或ADLS Gen2), Delta Lake是这个复制功能的有效目标表。
- 对于从数据库到Delta Lake的更自定义转换,ADF足够灵活,可以使用如下所示的信息模式从数据库读取所有表在这里.从这里,您可以简单地配置Databricks笔记本,它使用来自信息模式的输入表名,并通过执行Databricks笔记本来复制每个表,后者使用JDBC从数据库中读取数据。的例子是在这里.
基于api的数据源摄取
彭博社是市场数据、参考数据和数百个其他feed的行业标准之一。为了展示一个来自彭博数据订阅的基于API的摄取(图1中左)的示例,B-PIPE(用于访问市场数据源的彭博数据API)模拟器将被使用。下面的代码修改了原始模拟器中的Java市场数据订阅客户端代码,以便使用AWS SDK将事件发布到Kinesis实时流中。
将B-PIPE市场数据写入流媒体服务
//开放市场数据订阅服务session.openServiceAsync(“/ / blp / mktdata”,新CorrelationID (-9999年));//创建列表的证券来不断摄取成三角洲湖SubscriptionList slist=新SubscriptionList ();slist.add (新订阅("SPY US EQUITY", runmarketdatasubscript ._fields));slist.add (新订阅("AAPL 150117C00600000股权",runmarketdatasubscript ._fields));slist.add (新订阅("AMD美国股票",runmarketdatasubscript ._fields));session.subscribe (slist)//内循环通过连续流的消息从B-管道市场数据订阅//使用Kinesis客户端来检索到的写记录从API来运动流AmazonKinesisClient kinesisClient=新AmazonKinesisClient (新BasicAWSCredentials (,));字符串kinesisEndpointUrl=“https://kinesis.us -东- 1. - amazonaws.com”;字符串regionName=“us-east-1”;kinesisClient.setEndpoint (kinesisEndpointUrl);//创建PutRecordRegust与字节从API请求(输出)而且包括序列号PutRecordRequest PutRecordRequest=新PutRecordRequest ();putRecordRequest。setStreamName(" databicks -bpipe");putRecordRequest.setData (ByteBuffer。wrap(output.getBytes()));putRecordRequest。setPartitionKey(" mpt -data- partitionkey ");putRecordRequest。setsequencenumberforordered (sequenceNumberOfPreviousRecord);PutRecordResult PutRecordResult=kinesisClient。putRecord(putRecordRequest);sequenceNumberOfPreviousRecord=putRecordResult.getSequenceNumber ();
从运动流写入数据到三角洲湖
薇尔运动=spark.readStream.format(“运动”).option(“streamName”、“databricks-bpipe”).option(“地区”、“us-east-1”).option(“initialPosition”、“最新”).load ()val df=运动.withColumn(“mktdata坳(“数据”)。投(“字符串”)).withColumn(“event_ts”,分裂(坳(“mktdata”)、“,”)(0)).withColumn(“股票”,分裂(分裂(坳(“mktdata”)、“,”)(1), " ")(1)).withColumn(“quote_pr”,翻译(分裂(坳(“mktdata”)、“,”)(2), "$", "")).withColumn(“event_dt坳(“event_ts”)。投(“时间戳”)。投(“日期”)
df.writeStream.partitionBy(“event_dt”).format(“δ”).option(“路径”、“/ tmp /砖/ bpipe”).option(“checkpointLocation”、“/ tmp /砖/ bpipe_cp”).开始()
转换和读取记录
显示器(spark.read。格式(“δ”) .load (“/ tmp /砖/ bpipe”))
标记数据源摄取
标记数据(位于图1的左下角)是高分辨率日内市场数据的总称,通常来自数据供应商,以CSV、JSON或二进制格式作为批处理源。计时数据的类型包括交易、报价和合同数据,交付的示例是Tick数据历史服务由汤森路透提供。从这些来源连续将数据输入Delta Lake的最简单方法是建立Databricks自动装卸机从存储桶读取数据并将数据重定向到单独的Delta Lake表。从这里开始,各种ETL流程可以将每种消息类型整理到精炼的或聚合的Delta表中。自动装弹机的好处有两方面:
- 从Delta Lake继承的可靠性和性能
- 由于底层使用SQS (AWS)或AQS (Azure)以避免重新列出输入文件,以及托管检查点以避免手动选择最新未读文件,从而降低了成本。
从Delta Lake到金融服务用例产品化
除了在构建任何数据平台时所面临的数据收集挑战之外,投资管理公司越来越需要解决将人工智能纳入产品套件的问题,以及管理功能工程的成本。bob体育客户端下载特别是:
- 零售和机构投资公司也需要能够以经济有效的方式在数据湖上查询和运行ETL,并最大限度地减少与丰富和查询数据湖相关的维护成本.
- 散户投资者期待在订阅中提供人工智能支持的产品和见解。最佳解决方案将以这样的方式托管人工智能基础设施,用户可以创建人工智能驱动的应用程序和仪表板,其中将花在设置库和弹性计算基础设施上的时间降至最低,并且底层处理可以扩展到每天从事务数据源(客户事务和引号等)获得的数十亿个数据点。
现在,我们已经提出了将金融数据集登陆云数据湖的可靠、有效的方法,我们希望解决云和人工智能产品中金融数据集之间的一些现有差距。
上图显示了数据集和筒仓式基础设施是如何不足以交付投资分析产品。大多数fsi已经采用了右侧几乎所有的AI用例启用器,但未能最大限度地扩大这些与核心数据集的体积加权重叠。Databricks统一数据分析平台包含了前四个开箱即用的AI用例。bob体育客户端下载为了使生产更加具体,我们将展示如何使用新的Databricks开源包tempo大规模地操作时间序列。然后,我们将深入研究以下使用tempo的用例特性创建模板,并展示如何在上面的维恩图中获得两个世界的最佳效果。
- 零售投资详细使用基本数据来告知每日波动率预测。
- 市场监测详细说明总结价格改进和检测欺骗的过程。
节拍-时间序列包
在金融服务中,时间序列无处不在,我们发现我们的客户很难大规模地操纵时间序列。在过去,我们已经概述了一些方法扩展时间序列查询。现在,Databricks Labs发布了一套简单的时间序列实用程序,使时间序列处理变得更简单节奏.这个包包含实用程序来做以下工作:
- AS OF将多达数百万个不规则时间序列合并在一起
- 通过滚动聚合现有指标来创建功能
- 优化的写入Delta Lake非常适合临时时间序列查询
- 量加权平均价格(VWAP)计算
- 重采样
- 指数移动平均计算
通过结合tick数据的多功能性、可靠的数据管道和tempo等开源软件,组织可以以最低的成本和快速的执行周期从各种用例中释放指数级价值。bob下载地址下一节将介绍资本市场中利用节奏的两个反复出现的主题:波动预测和市场监督。
波动率预测方法与基本和技术数据
标普全球市场情报提供基本数据可以使用称为Xpressfeed的机制(本指南前面介绍过)来摄取。关于这个提要的一些要点是:
- 它涵盖了数千个基本数据指标
- 它涵盖了数十万只全球上市和非上市股票
- 报告频率是每天-有一个归档日期,可以用于时间点分析
虽然我们不涵盖对tick ETL的管理过程(联系Databricks)销售关于这个用例的更多信息),我们概述了使用tempo库从标准节拍格式到最终预测对象的处理过程;我们的实现在本博客底部报告的链接中。高层细节如下:
- 创建时间点日历-将最新的基本数据合并到最新的日历日期,使用提交日期(基本数据点提交日期为交易日)。通常称为as - of连接,这种操作通常成本很高,并且在高度不平衡的数据集中受到技术瓶颈的影响。Tempo将保证该操作均匀分布,以最大限度地利用云弹性(及其相关成本)。
- 创建对等组-使用有意义的基本数据项目,如每股收益、股本回报率、浮动百分比(代表股东持股),根据每个指标形成对等组。请注意,这里需要对数据项值进行旋转,以执行有意义的特性工程。
- 重新采样tick数据精确到小时(或所需的任何粒度)。之所以选择以小时为单位,是因为每日聚合不能提供足够的粒度来很好地预测波动。
- 预测市场波动在Databricks使用运行时进行机器学习。
- 综合预测结果根据所评估的证券找出最大/最小波动率公司。
该数据架构值得注意的一个方面是创建黄金预测表时的最后转换。特别是,
- 我们已经将ML作为特征工程处理的一部分。这意味着我们应该将CI/CD作为ML治理的一部分。在这里是一个模板,完成这在完全严格。
- 我们选择强调gpu在预测波动率方面的重要性。在本博客最后的笔记本示例中,我们选择使用xgboost和各种报价指标的简单范围统计数据作为我们的功能的一部分。通过利用gpu_hist树方法和完全托管的GPU集群和运行时,我们可以节省2.65倍的成本(和2.27倍的运行时),为数据团队展示了硬成本的降低和生产力的节省。这些指标是根据美国主要交易所6个月的tick数据获得的。
最终,在节奏和Databricks运行时机器学习在美国,零售经纪公司可以通过使用人工智能技术统一基本面和技术分析的仪表板为客户提供服务。以下是我们同行预测的结果。
使用蜱虫数据的市场监测方法
市场监管是金融服务生态系统的重要组成部分,旨在减少市场操纵,提高透明度,并对各种资产类别实施基准规则。一些有广泛监督计划的政府和私人组织的例子包括纳斯达克、FINRA、CFTC和CME集团。随着散户投资行业不断壮大,新投资者和缺乏经验的投资者(源),特别是在数字货币领域,了解如何建立一个基本的监督程序,减少金融欺诈,增加市场波动、风险和最佳执行等领域的透明度是很重要的。在下一节中,我们将展示如何构建基本的价格改进摘要,以及将基本的欺骗实现组合在一起。
价格提高
价格提升指的是经纪人提供给客户的买入价(在卖单的情况下)或卖出价(在买单的情况下)的提升量。这对一个零售经纪人来说很重要,因为它通常有助于一个经纪人的感知质量,如果它不断地为客户节省一组交易的钱。价格改善的基本概念是:
- Maria在上午10点向XYZ股票下了100股的市场订单,最佳出价/卖出价为10美元/ 11美元
- 经纪人A将订单发送到交易所,以获得每股10.95美元的执行价格
- 在这次执行中节省了$0.05 * 100 = $5.00,这代表了一些适度的价格改进
尽管改善很小,但随着时间的推移,这些节省可以累积到数百笔交易中。一些经纪人在应用程序中显示这些信息也是为了透明度,并展示通往适当的市场中心或做市商以获得好价格的能力。
计算价格改进
价格改善确实是一种特殊情况滑动(订单到达时执行价格与最佳买入价/卖出价之间的变动幅度)。它对数字货币的影响与传统股票一样大,可以说影响更大,因为数字货币的波动性和订单量波动很大。例如,这里有一些对金融市场深度和滑坡的洞察。下面是如何使用tempo计算滑移的基本蓝图(详细代码可在附件笔记本中获得):
- 摄取市场订单信息(已下的订单)
- 摄取执行消息
- 使用tempo执行AS OF连接到订单到达时间
- 使用tempo执行AS OF连接到执行时间
- 衡量执行价格和订单到达时的买入价/卖出价之间的差异
- 根据公司进行总结,并在SQL分析和/或BI仪表板中提供
从内部系统或OMS(订单管理系统)获取订单簿数据以获取订单和执行,通常以JSON或其他平面文件格式提供。一旦该数据可用,AS OF连接就会按照官方tempo文档中描述的那样对一对数据帧进行操作在这里:
下面我们将显示执行连接的代码。
从tempo.tsdf进口TSDF交易= spark.table(“exchange_trades”)trades_tsdf = TSDF(交易,ts_col =“event_ts”, partition_cols = [“日期”,“股票”])quotes_tsdf = TSDF(“tempo.delta_tick_quotes_6mo”), ts_col =“event_ts”, partition_cols = [“日期”,“股票”])Ex_asof = trades_tsdf。asofJoin(quotes_tsdf, right_prefix = .“asof_ex_time”)orders_tsdf = TSDF(ex_asof。Df, ts_col =“order_arrival_ts”, partition_cols = [“日期”,“股票”])Order_asof = ex_asof。asofJoin(quotes_tsdf, right_prefix = .“asof_ord_time”)
order_asof \.df \.write \.格式(“δ”) \.mode (“覆盖”) \.saveAsTable (“tempo.silver_trade_slippage”)
一旦这些数据在Delta Lake可用,就可以以各种方式切片,以得到那些有明显滑移的证券的摘要。请参阅下面的示例,该示例总结了交易日某段时间内的总滑动量的日志。
欺骗
欺骗指的是一种市场操纵模式,涉及人为的兴趣进入(通过虚假的订单投放),然后在对面执行,以利用受原始人为兴趣错误影响的最佳买入价/卖出价变化。事件的欺骗顺序通常还包括取消订单—下面我们概述一个简单的示例。
欺骗是数百种不同的市场操纵技术之一,发生在许多不同的资产类别中。特别是,它一直是大多数股票市场监控计划的一部分,但由于比特币和以太币等数字货币需求的增加,它的重要性越来越高。事实上,由于加密货币的波动性如此之大,保护客户免受潜在的欺骗活动,以确保对加密平台(无论是交易所还是DeFi框架)的信任至关重要。bob体育客户端下载
样本模式
检测欺骗的步骤序列也适用于其他操作模式(例如,提前运行、分层等),因此我们概述了一种简单的方法来突出一些底层技术。
- 保存订单放置信息-键上的订单ID和序列号
- 保存所有订单的取消信息(附带ORDER ID)
- 记录订单到达时的NBBO(下面数据中的order_rcvd_ts)以及订单到达前的NBBO,加入订单和取消(寻找完全取消的订单),并记录以下形式的序列:
- 限价下单前几秒的NBBO变化(对于卖出订单,最佳报价减少)
- 下单时取消(我们将虚假订单称为非真诚订单)
- 执行对面的订单放置上面
- 同一市场参与者的清洗交易(自我交易)活动(或者作为细微差别,这可以表示同一CRD下的不同mpid)
示例模式捕捉NBBO(引用作为代理)信息使用的速度as连接:
从tempo.tsdf进口TSDForders_and_cncls_tsdf=TSDF (orders_and_cncls ts_col=“prior_order_rcvd_ts”, partition_cols=(“日期”、“股票”))prior_quotes_tsdf=TSDF (prior_quotes ts_col=“event_ts”, partition_cols=(“日期”、“股票”))prior_order_asof=orders_and_cncls_tsdf。asofJoin (prior_quotes_tsdf right_prefix=“asof_prior_order”)prior_order_asof=TSDF (prior_order_asof。df, ts_col=“order_rcvd_ts”, partition_cols=(“日期”、“股票”))order_asof=prior_order_asof。asofJoin (prior_quotes_tsdf right_prefix=“asof_order”)nbbo_deltas=order_asof。df \.withColumn(“nbbo_ask_delta_direction”符号(坳(“asof_prior_order_ASK_PRICE”)-坳(“asof_order_ASK_PRICE”)))\.withColumn(“nbbo_bid_delta_direction”符号(坳(“asof_order_BID_PRICE”)-坳(“asof_prior_order_BID_PRICE”)))\.withColumn(“nbbo_ask_delta”,腹肌(坳(“asof_prior_order_ASK_PRICE”)-坳(“asof_order_ASK_PRICE”)))\.withColumn(“nbbo_bid_delta”,腹肌(坳(“asof_order_BID_PRICE”)-坳(“asof_prior_order_BID_PRICE”)))
下面,我们将几个样本订单的NBBO向下运动可视化,这验证了我们在NBBO变化中寻找的模式。
最后,我们避免了一些公司的报告,这些公司的非真诚执行恰好与一些洗账交易活动相吻合。
结论
在这个蓝图中,我们关注的是将公共数据集导入Delta Lake,以及在Delta Lake对象上生成管道的策略。利用Delta Lake使金融服务机构能够专注于为客户提供产品,最终导致资产管理规模的增加,金融欺诈的减少,以及随着投资世界扩展到越来越多的散户投资者而增加的订阅量。从技术角度来看,以上所有用例都是通过现代数据架构的核心原则,以及新发布的tempo库的帮助而实现的:
- 支持开源包,并与业界接受的框架集成
- AI用例的基础设施支持
- 特性创建模板
- 时间序列分析支持
我们已经记录了这些方法,并在下面的笔记本链接中为一些流行的用例提供了功能创建模板。此外,我们还介绍了tempo及其在这些模板中的应用,作为投资数据平台的基础。bob体育客户端下载
试试下面的笔记本在Databricks加速你的投资平台bob体育客户端下载联系我们了解更BOB低频彩多关于我们如何帮助具有类似用例的客户的信息。