云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

谷歌云_阿里云事业部_安全稳定

小七 141 0

ApacheSpark在Databricks上使用Amazon Kinesis的结构化流媒体

2017年7月11日,我们宣布Apache Spark 2.2.0作为统一分析平台Databricks Runtime 3.0(DBR)的一部分正式提供。为了扩大DBR上结构化流的范围,我们支持AWS Kinesis Connector作为源(从中读取流),让开发人员可以自由地做三件事。首先,您可以选择apachekafka或Amazon的Kinesis作为读取流数据的源。第二,你不受Kinesis分析的束缚,可以使用sparksql和结构化api。最后,您可以在Unified Databricks平台上使用apachespark以及其他工作负载来编写端到端的连续应用程序。在本博客中,我们将讨论用于结构化流媒体的Kinesis连接器的四个方面,以便您可以快速开始使用数据块,并且只需进行最小的更改,就可以切换到您选择的其他流媒体源和接收器。运动数据图式配置参数AWS Kinesis认证Kinesis结构化流媒体应用剖析运动学数据模式了解您从流中读取了哪些Kinesis记录,并了解这些记录如何映射到定义的模式,可以使开发人员的工作更轻松。如果Kinesis记录映射到apachespark的数据帧,以及命名列及其关联类型,则会更好。然后,您可以从Kinesis记录中选择所需的有效负载,方法是从生成的DataFrame访问列并使用dataframeapi操作。假设您将JSON blob作为记录发送到Kinesis。要访问二进制数据负载(JSON编码的数据),可以使用DataFrame API方法as cast(data as STRING)as JsonData将二进制负载数据反序列化为JSON字符串。此外,一旦转换为JSON字符串,就可以使用from_JSON()SQL实用程序函数分解成相应的DataFrame列。因此,了解Kinesis模式以及它如何映射到数据帧,可以使流式ETL变得更容易,无论您的数据是简单的,比如单词,还是结构化和复杂的,比如嵌套的JSON。体位与理解Kinesis记录及其模式一样重要的是要知道在Kinesis连接器代码中提供正确的配置参数和选项。虽然选项很多,但值得注意的进口选项却很少:有关详细选项,请阅读Kinesis配置文档。现在,我们已经知道了从Kinesis记录派生的数据帧的格式,并了解了可以向Kinesis connector提供哪些选项来读取流,我们可以编写代码,如下所示的Kinesis流式应用程序的剖析。但是为了安全起见,我们必须首先通过AWS认证。AWS Kinesis认证默认情况下,Kinesis连接器使用Amazon的默认凭证提供者链,因此如果您为Databricks集群创建了一个IAM角色,其中包括对Kinesis的访问,那么访问权限将被自动授予。此外,根据您的IAM角色访问权限,相同的默认凭据将授予您访问aws3存储桶进行写入的权限。或者,您可以显式地将凭据作为"选项"的一部分提供给Kinesis连接器。当提供显式密钥时,使用两个"option"参数:awsAccessKey和awsSecretKey。但是,我们建议使用AWS IAM角色,而不是在生产中提供密钥。Kinesis结构化流媒体应用剖析到目前为止,我们介绍了三个概念,这些概念使我们能够使用Kinesis连接器编写结构化流媒体应用程序。一个结构化的流媒体应用程序有一个独特的解剖结构,一个连续的步骤,不管你的流源或汇。让我们学习每一步。步骤1:定义数据的模式尽管Kinesis连接器可以读取任何编码的数据,包括JSON、Avro、字节,只要你能在接收到的Spark代码中解码,对于这个blog,我们将假设我们的Kinesis流是以JSON字符串形式编码的设备数据,其模式如下。其他从pyspark.sql.types进口*pythonSchema=结构类型()\.add("电池电量",LongType())\.add("c02_level",LongType())\.add("cca3",StringType())\.add("cn",StringType())\.add("设备标识",LongType())\.add("设备类型",StringType())\.add("signal",LongType())\.add("ip",StringType())\.add("temp",LongType())\.add("timestamp",TimestampType())第二步:从你的源头阅读定义了模式之后,下一步就是使用Kinesis connector读取流。通过只指定源格式,即"kinesis",Databricks将自动使用kinesis连接器进行读取。它将处理从哪个切分读取的所有方面,并跟踪所有元数据。你不用担心。这里需要注意的是,如果我的源代码不是"kinesis",我只需将其更改为指示"kafka"或"socket",并删除AWS凭证。其他KinesDf=火花\.读流\.format("kinesis")\.option("streamName","devices")\.option("initialPosition","earliest")\.选项("region","us-west-2")\.option("awsAccessKey",awsAccessKey)\.option("awsSecretKey",awsSecretKey)\.加载()第三步:探索或改造河流一旦我们加载了数据,并且Kinesis记录现在已经映射到DataFrames,我们就可以使用SQL和DataFrames/Datasets API来处理。而底层的流引擎将确保一次语义和容错。要了解更多关于Spark Streaming如何在结构化流媒体中实现这一重要功能的信息,请查看我们的Spark峰会。其他#从有效负载中提取数据并使用转换来进行分析数据设备DF=KinesDF\.selectExpr("cast(data as STRING)jsonData")\.select(from_json("jsonData",pythonSchema).alias("devices"))\。选择("设备*")\.过滤器("设备温度>10和设备.信号>15")这一步是你的大部分分析完成的地方,你的行动见解是从中得到的。通过在这一步中使用Spark的结构化api,您可以获得Spark SQL性能的所有优点,并且不需要在单独的SDK中使用另一个SQL引擎或编程来执行ETL或流式分析。第4步:保存已转换的流最后,您可以选择将转换后的流写入S3存储桶中指定位置的拼花文件中,该位置按"日期"或"时间戳"划分。要通知Spark以确保容错,可以指定一个选项参数"checkpointLocation",底层引擎将保持该状态。其他#从有效负载中提取数据并使用转换来进行分析数据设备DF=KinesDF\.selectExpr("cast(data as STRING)jsonData")\.select(from_json("jsonData",pythonSchema).alias("devices"))\。选择("设备*")\.过滤器("设备温度>10和设备.信号>15")\#写入拼花板文件.writeStream公司\.partitionBy("时间戳")\.format("拼花地板")\#指定检查点位置.option("checkpointLocation","/parquetCheckpoint")\#存放拼花地板分区文件的位置.start("/parquetDeviceTable")这四个基本步骤封装了一个典型的结构化流应用程序的解剖结构。无论您的源是Kinesis、Kafka、socket还是本地文件系统,您都可以遵循这些指导原则来构建结构化流计算。如果您想将转换后的流写入到您自己的sink中,例如Kinesis或NoSQL,目前Spark的结构化流式处理不支持这种情况,该怎么办。通过实现foreachsink接口将数据写入Kinesis,您可以编写自己的sink功能。下一步是什么与其用展示Kinesis连接器流式应用程序的完整代码示例来搅乱这个博客,我将建议您检查和探索代码,在Kinesis上实现分布式计算WordCount的精髓"Hello World"。更好的是,您可以导入这个WordCount笔记本,并提供您的AWS凭证并继续使用它。你不需要安装或附加任何Kinesis库。无需访问外部Kinesis SDK。您只需在Databricks Runtime 3.0上编写结构化流。剩下的我们来做。如果你没有Databricks账户,今天就去买一个。阅读更多我们有一系列结构化的流媒体博客,阐述了它的许多特性,您可以参考我们的Kinesis连接器文档以及一些沉浸式阅读的结构化流式编程指南。免费试用Databricks。今天就开始吧