云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

网站服务器_ip服务器_排行榜

小七 141 0

apachespark1.1:将Hadoop输入/输出格式引入PySpark

这是Graphflow的nickpentreath和IBM的Kan Zhang的客座帖子,他们为apachespark1.1提供了Python输入/输出格式支持。apachespark的两个强大特性包括Scala、Java和Python提供的本地api,以及它与任何基于Hadoop的输入或输出源的兼容性。这种语言支持意味着即使没有Scala经验,用户也可以快速熟练地使用Spark,而且还可以利用大量可用的第三方库(例如,Python的许多数据分析库)。内置的Hadoop支持意味着Spark可以与任何实现Hadoop InputFormat和OutputFormat接口的数据存储系统或格式"开箱即用",包括HDFS、HBase、Cassandra、Elasticsearch、DynamoDB等,以及SequenceFiles、Parquet、Avro等多种数据序列化格式,节约和协议缓冲区。以前,hadoopinputformat/OutputFormat支持只在Scala或Java中提供。要在Python中访问这些数据源,而不是简单的文本文件,用户需要首先用Scala或Java读取数据,然后将其作为文本文件写出,以便在Python中再次读取。随着apachespark1.1的发布,Python用户现在可以直接从任何与Hadoop兼容的数据源读写数据。例如:读取序列文件SequenceFile是Hadoop的标准二进制序列化格式。它存储可写键值对的记录,并支持拆分和压缩。sequencefile是一种常用的格式,特别是在Map/Reduce管道中的中间数据存储中,因为它们比文本文件更有效。Spark长期以来支持使用SparkContext实例上可用的sequenceFile方法本机读取sequenceFile,它还利用Scala特性允许在方法调用参数中指定键和值类型。例如,要在Scala中读取带有文本键和双写值的SequenceFile,我们将执行以下操作:价值rdd=sc.sequenceFile文件[字符串,双精度](路径)Spark负责自动将文本转换为字符串,并将DoubleWritable转换为Double。新的PySpark API功能在Python SparkContextinstance上公开了一个sequenceFile方法,该方法的工作方式大致相同,默认情况下会推断出键和值类型。PySpark RDD上的saveAsSequenceFile方法允许用户将键值对的RDD保存为SequenceFile。例如,我们可以从Python集合创建RDD,将其另存为SequenceFile,然后使用以下代码片段将其读回:rdd=sc.并行化([("键1",1.0),("键2",2.0),("键3",3.0)])rdd.saveAsSequenceFile文件('/tmp/pysequencefile/')…sc.sequenceFile文件('/tmp/pysequencefile/').collect()[(u'key1',1.0),(u'key2',2.0),(u'key3',3.0)]引擎盖下面这个特性是在现有Scala/javaapi方法的基础上构建的。为了在Python中工作,需要有一个桥来将HadoopInputFormats生成的Java对象转换为可以序列化为PySpark可用的pickled Python对象(反之亦然)。为此,我们引入了一个转换器特性,以及一对处理标准Hadoop可写文件的默认实现。自定义Hadoop转换器虽然默认转换器处理最常见的可写类型,但用户需要为自定义可写文件或不生成可写文件的序列化框架提供自定义转换器。为了说明这一点,apachespark1.1示例子项目中包含了一些用于HBase和Cassandra的额外转换器以及相关的PySpark脚本。一个更详细的例子:Avro的定制转换器对于那些想深入研究的人,我们将以apacheavro序列化格式为例,展示如何编写更复杂的自定义PySpark转换器。需要考虑的一件事是转换器将获得什么样的输入数据。在我们的例子中,我们打算使用AvroKeyInputFormat的转换器,并且输入数据将是Avro记录包装在AvroKey中。因为我们要处理所有3个Avro数据映射(Generic、Specific和Reflect),对于每个Avro模式类型,我们需要处理这些映射产生的所有可能的数据类型。例如,对于Avro BYTES类型,通用和特定映射都会输出java.nio.ByteBuffer,而反射映射输出数组[Byte]。所以我们的unpackBytes方法需要处理这两种情况。def unpackBytes(obj:Any):数组[Byte]={val字节:数组[Byte]=对象匹配{案例buf:java.nio.ByteBuffer=>;buf.数组()case arr:Array[Byte]=>;arrcase other=>抛出新的SparkException(s"未知字节类型${其他.getClass.getName}")}val bytearray=新数组[字节](字节.长度)系统阵列复制(字节,0,字节,0,字节.长度)再见}另一件要考虑的事情是转换器将输出哪些数据类型,或者等效地,PySpark将看到哪些数据类型。例如,对于Avro数组类型,反射映射可以生成基元数组、对象数组或java.util.Collection取决于它的输入。我们把这些都转换了tojava.util.Collection,然后序列化为Python列表的实例。def unpackArray(对象:任何,架构:schema):java.util.Collection[任何]=对象匹配{案例c:JCollection[\u]=>;c、 地图(来自AVRO(\,schema.getElementType))case arr:Array[\u]如果arr.getClass.getComponentType.isPrimitive=>;arr.toSeq公司case arr:Array[\u]=>;排列图(来自AVRO(\,schema.getElementType)).toSeq公司case other=>抛出新的SparkException("未知数组类型"${其他.getClass.getName}")}最后,我们需要处理嵌套的数据结构。这是通过在单个unpack*方法和Avro中央交换机之间递归调用来完成的,后者处理所有Avro模式类型的调度。def fromAvro(obj:Any,schema:schema):任意={如果(obj==null){返回空}架构.getType匹配{case UNION=>unpackUnion(obj,schema)case ARRAY=>unpackArray(obj,schema)大小写固定=>解包固定(obj,schema)case-MAP=>unpackMap(obj,schema)大小写字节=>解包字节(obj)>解包记录(obj记录)大小写字符串=>目标字符串案例枚举=>目标字符串case NULL=>对象大小写布尔值=>obj双格=>目标箱浮动=>目标case INT=>目标箱长=>目标case other=>抛出新的SparkException("未知的Avro架构类型${其他.getName}")}}avrowrapertojavaconverter的完整源代码可以在Spark示例中找到AvroConverters.scala公司,而使用转换器的相关PySpark脚本可以在这里找到。结论和今后的工作我们很高兴在1.1版本中将这个新特性引入PySark,并期待着看到用户如何利用内置功能和自定义转换器。电流转换器接口的一个限制是无法设置自定义配置选项。未来的改进可能是允许转换器采用Hadoop配置,该配置允许在运行时进行配置。免费试用Databricks。今天就开始吧