云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

数据库_百度云转迅雷下载方法_

小七 141 0

在apachespark的结构化流媒体中与apachekafka进行端到端的实时集成

在Databricks Community Edition中查看笔记本结构化流api支持以一致、容错的方式构建称为连续应用程序的端到端流应用程序,这种方式可以处理编写此类应用程序的所有复杂性。它这样做不必考虑流本身的细节,也不允许在sparksql中使用熟悉的概念,如数据帧和数据集。所有这些都导致了对想要利用它的用例的高度兴趣。从介绍到ETL,再到复杂的数据格式,这个主题已经有了广泛的报道。结构化流媒体还集成了第三方组件,如Kafka、HDFS、S3、RDBMS等。在本博客中,我将介绍与Kafka的端到端集成,从它获取消息,执行简单到复杂的窗口化ETL,并将所需的输出推送到各种接收器,如内存、控制台、文件、数据库,然后返回到Kafka本身。在写入文件的情况下,我还将介绍在现有分区表下写入新数据。连接到卡夫卡的话题假设您有一个可以连接到的Kafka集群,并且您希望使用Spark的结构化流来接收和处理来自主题的消息。Databricks平台已经包含了一个用于结构化流媒体的Apache Kafka 0.10连接器,因此很容易设置一个流来读取消息:在读取流时可以指定许多选项。这些选项的详细信息可以在这里找到。让我们快速看一下上面设置的streamingInputDF DataFrame的模式。它包括key、value、topic、partition、offset、timestamp和timestampType字段。我们可以根据加工需要挑选。"value"字段是实际数据,timestamp是消息到达时间戳。在窗口化的情况下,我们不应将此时间戳与消息本身可能包含的内容混淆,后者在大多数情况下更为相关。流式ETL既然流已经设置好了,我们就可以开始对它执行所需的ETL,以提取有意义的见解。请注意,streamingInputDF是一个数据帧。因为数据帧本质上是行的非类型化数据集,所以我们可以对它们执行类似的操作。假设通用的ISP hit JSON数据被推送到上面的Kafka。示例值如下所示:现在可以快速地进行有趣的分析,例如有多少用户来自zipcode,用户来自哪个ISP等。然后我们可以创建仪表板,供组织的其他成员共享。让我们深入研究一下:注意,在上面的命令中,我们能够从传入的JSON消息中解析zipcode,对它们进行分组并进行计数,所有这些都是在从Kafka主题读取数据时实时进行的。一旦我们有了计数,我们就可以显示它,它在后台启动流作业,并在新消息到达时不断更新计数。这个自动更新的图表现在可以作为Databricks中的访问控制仪表板与我们组织的其他成员共享。开窗既然我们已经连续执行parse、select、groupBy和count查询,那么如果我们希望在10分钟的窗口间隔内,以5分钟的滑动持续时间从1小时后2分钟开始计算每个邮政编码的流量,那该怎么办?在本例中,传入的JSON在"hittime"中包含时间戳,因此让我们使用它来查询每个10分钟窗口的流量。请注意,在结构化流式处理中,窗口化被视为groupBy操作。下面的饼图表示每个10分钟的窗口。输出选项到目前为止,我们已经看到最终结果是自动显示的。如果我们想在输出选项方面有更多的控制,可以使用多种输出模式。例如,如果需要调试,您可能希望选择控制台输出。如果我们需要能够在数据被消耗时交互地查询数据集,那么内存输出将是一个理想的选择。类似地,输出可以写入文件、外部数据库,甚至流式传输回Kafka。让我们详细讨论一下这些选项。记忆在这个场景中,数据存储为内存中的表。从这里,用户可以使用SQL查询数据集。表的名称是从queryName选项指定的。注意,我们继续使用上述窗口示例中的streamingSelectDF。从这里开始,您现在可以像对常规表那样在数据自动更新的同时进行更有趣的分析。慰问在这个场景中,输出被打印到控制台/标准输出日志。文件这种情况非常适合于输出的长期持久性。与内存和控制台接收器不同,文件和目录是容错的。因此,此选项需要一个检查点目录,其中维护容错状态。一旦数据被保存,就可以像在Spark中对任何其他数据集进行查询一样进行查询。文件输出接收器的另一个优点是,您可以根据列的任何变化动态地划分传入消息。在这个特定的例子中,我们可以通过'zipcode'和'day'进行分区。这有助于加快查询速度,因为只需引用单个分区就可以跳过数据块。然后我们可以按"日"按"zip"对传入数据进行分区。让我们看看输出目录。现在,分区的数据可以直接在数据集和数据帧中使用,如果创建了指向文件写入目录的表,则可以使用Spark SQL查询数据。这种方法的一个警告是,必须将一个分区添加到表中,以便表下的数据集可以访问。分区引用可以预先填充,以便在其中创建文件时,它们将立即可用。现在,您可以在将数据持久化到正确的分区中时,对自动更新的表执行分析。数据库通常我们希望能够将流的输出写入外部数据库,比如MySQL。在编写本文时,结构化流式API不支持外部数据库作为接收器;但是,当它支持时,API选项将像.format("jdbc").start("jdbc:mysql/。。"). 同时,我们可以使用foreach sink来完成这个任务。让我们创建一个自定义的JDBC接收器来扩展ForeachWriter并实现其方法。我们现在可以使用jdbsink:当批处理完成后,可以根据需要将counts by zip插入/向上插入到MySQL中。卡夫卡与写入数据库类似,当前的结构化流媒体API不支持"kafka"格式,但它将在下一个版本中提供。同时,我们可以创建一个名为KafkaSink`的自定义类,它扩展了\u ForeachWriter。让我们看看这是什么样子:现在我们可以使用writer:您现在可以看到,我们正在将消息发送回Kafka主题。在这种情况下,我们正在推送更新zipcode:计数每批的结束。另一件要注意的事情是,流式仪表板提供了对传入消息与处理速率、批处理持续时间和用于生成它的原始数据的洞察。这在调试问题和监视系统时非常方便。在卡夫卡消费者方面,我们可以看到:在本例中,我们在"更新"输出模式下运行。当消息被消费时,在该批处理期间更新的zipcode被推回到Kafka。不会发送未更新的zipCode。您也可以在"完成"模式下运行,就像我们在上面的数据库接收器中所做的那样,在这种模式下,所有具有最新计数的zipcode都将被发送,即使某些zipcode计数自上一批处理以来没有改变。结论在较高的层次上,我介绍了与Kafka的结构化流媒体集成。此外,我还演示了如何使用api使用各种接收器和源。需要注意的一点是,我们在这里所经历的与其他流(socket、directory等)同样相关。例如,如果您希望使用一个socket源并将处理过的消息推送到MySQL,那么这里的示例应该能够通过更改流来实现这一点。此外,显示ForeachWriter的示例可以用于将写入分散到多个下游系统。我计划在后面的文章中更详细地介绍扇形和水槽的更深入的见解。我们在这个博客中使用的示例代码可以作为Databricks笔记本使用。你可以通过注册一个免费的Databricks Community Edition帐户开始尝试结构化流媒体。如果您有任何问题,或想开始使用Databricks,请与我们联系。最后,我鼓励您阅读我们关于结构化流媒体的系列博客:apachespark2.1中使用结构化流的实时流ETL在ApacheSpark2.1中使用结构化流处理复杂的数据格式免费试用Databricks。今天就开始吧