云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

免备案CDN_阿里云企业邮箱官网_免费1年

小七 141 0

如何在ApacheSpark2.0中使用SparkSession

在数据库里试试这个笔记本通常,会话是两个或多个实体之间的交互。用计算机术语来说,它在因特网上联网的计算机领域的使用是突出的。首先是TCP会话,然后是login session,然后是HTTP和user session,所以现在我们有了apachespark中引入的SparkSession。除了有时间限制的交互之外,SparkSession还提供了与底层Spark功能交互的单入口点,并允许使用DataFrame和Dataset api编程Spark。最重要的是,它限制了开发人员在与Spark交互时必须处理的概念和构造的数量。在这个博客及其附带的Databricks笔记本中,我们将探讨spark2.0中的SparkSession功能。探索SparkSession的统一功能首先,我们将研究一个Spark应用程序SparkSessionZipsExample,它从JSON文件读取邮政编码,并使用dataframesapi进行一些分析,然后发出Spark SQL查询,而不访问SparkContext、SQLContext或HiveContext。创建SparkSession在Spark的早期版本中,必须创建SparkConf和SparkContext才能与Spark交互,如下所示://设置spark配置并创建上下文val sparkConf=new sparkConf().setAppName("SparkSessionZipsExample").setMaster("local")//SparkContext的句柄来访问其他上下文,比如SQLContextval sc=新的SparkContext(sparkConf)。设置("spark.some.config.option","一些值")val sqlContext=新建org.apache.spark网站.sql.SQLContext(sc)而在Spark 2.0中,同样的效果可以通过SparkSession实现,而不必明确地创建SparkConf、SparkContext或SQLContext,因为它们封装在SparkSession中。如果SparkSession对象尚不存在,则使用builder设计模式实例化SparkSession对象及其关联的底层上下文。//创建SparkSession。不需要创建SparkContext//你会自动得到它作为SparkSession的一部分val warehouseLocation="文件:${系统:user.dir}/火花仓库"val spark=火花会话.builder().appName("SparkSessionZipsExample").config("spark.sql.warehouse.dir",仓库位置).enableHiveSupport().getOrCreate()此时,您可以使用spark变量作为实例对象,以便在spark作业期间访问其公共方法和实例。配置Spark的运行时属性一旦SparkSession被实例化,就可以配置Spark的运行时配置属性。例如,在这个代码片段中,我们可以更改现有的运行时配置选项。因为configMap是一个集合,所以可以使用Scala的所有iterable方法来访问数据。//设置新的运行时选项spark.conf.set("spark.sql.shuffle.隔板",6)spark.conf.set("火花执行器内存","2g")//获取所有设置瓦尔配置映射:映射[字符串,字符串]=spark.conf.getAll()访问目录元数据通常,您可能希望访问和仔细阅读底层目录元数据。SparkSession将"catalog"公开为一个公共实例,该实例包含与metastore(即datacatalog)一起工作的方法。因为这些方法返回一个数据集,所以您可以使用数据集API来访问或查看数据。在这个片段中,我们访问表名和数据库列表。//从目录中获取元数据数据spark.catalog.list数据库显示(错误)spark.catalog.listTables公司。显示(错误)图1。从目录返回的数据集创建数据集和数据帧使用SparkSessionAPI创建数据帧和数据集有多种方法生成数据集的一种快速方法是使用火花射程方法。当学习如何使用它的API操作数据集时,这个快速方法被证明是有用的。例如,//使用创建数据集火花射程从5到100,增量为5数值=火花射程(5100,5)//颠倒顺序并显示前5项numDS.orderBy(desc("id")。显示(5)//计算描述性统计并显示它们numDs.描述()。显示()//使用创建数据帧spark.createDataFrame从列表或序列中值langPercentDF=spark.createDataFrame(列表(("Scala",35),("Python",30),("R",15),("Java",20)))//重命名列值lpDF=langPercentDF.withColumnRenamed("_1","language")。列重命名为("\u 2","percent")//按百分比降序排列DataFramelpDF.orderBy公司(desc("percent"))。显示(false)图2。数据帧和数据集输出使用SparkSession API读取JSON数据像任何Scala对象一样,您可以使用spark,SparkSession对象来访问它的公共方法和实例字段。我可以读取JSON、CVS或TXT文件,也可以读取拼花板表。例如,在这个代码片段中,我们将读取邮政编码的JSON文件,它返回一个DataFrame,一个通用行的集合。//读取json文件并创建dataframeval jsonFile=参数(0)值zipsDF=spark.read.json(jsonFile)//过滤所有人口>40K的城市zipsDF.过滤器(zipsDF.col公司("pop")>40000)。显示(10)在SparkSession中使用Spark SQL通过SparkSession,您可以像通过SQLContext一样访问sparksql的所有功能。在下面的代码示例中,我们创建了一个表,针对该表发出SQL查询。//现在创建一个SQL表并对其发出SQL查询//使用sqlContext,但通过SparkSession对象。//创建数据帧的临时视图zipsDF.createOrReplaceTempView("拉链表")zipsDF.cache()val结果DF=火花.sql("从zips_table中选择城市、pop、state、zip")结果df.show(十)图3 Spark作业运行的部分输出使用SparkSession保存和读取配置单元表接下来,我们将创建一个配置单元表,并使用SparkSession对象对其发出查询,就像使用HiveContext一样。//删除表(如果存在)以绕过现有表错误火花.sql("如果存在则删除表zips\u hive\u TABLE")//另存为配置单元表火花表("拉链表")。write.saveAsTable("zips配置单元表")//对配置单元表进行类似的查询val结果shivedf=火花.sql("从zips配置单元表中选择城市、pop、state、zip,其中pop>40000")结果hivedf.show(十)图4。配置单元表的输出正如您所观察到的,使用dataframeapi、sparksql和配置单元查询运行的结果是相同的。其次,让我们把注意力转向两个Spark开发环境,其中SparkSession是自动为您创建的。Spark REPL和Databricks笔记本中的SparkSession首先,与Spark的早期版本一样,Spark shell创建了SparkContext(sc),因此在Spark 2.0中,Spark shell创建了SparkSession(Spark)。在这个spark shell中,可以看到spark已经存在,并且可以查看它的所有属性。图5。火花壳中的火花会话其次,在Databricks笔记本中,当您创建集群时,SparkSession将为您创建。在这两种情况下,都可以通过一个名为spark的变量访问它。通过这个变量,你可以访问它的所有公共字段和方法。我不想在这里重复相同的功能,而是请您检查一下笔记本,因为每个部分都会探讨SparkSession的功能等。图6。Databricks笔记本中的SparkSession通过对邮政编码数据进行一些基本分析,您可以在Databricks笔记本SparkSessionZipsExample中探索上述示例的扩展版本。与上面的Spark应用程序示例不同,我们不创建SparkSession,因为它是为我们创建的,但使用了它所有公开的Spark功能。要试用此笔记本,请将其导入Databricks。SparkSession封装SparkContext最后,对于历史上下文,让我们简要了解一下SparkContext的底层功能。图7。SparkContext,因为它与驱动程序和集群管理器有关如图所示,SparkContext是访问所有Spark功能的管道;每个JVM只存在一个SparkContext。Spark驱动程序使用它连接到集群管理器进行通信,提交Spark作业,并知道要与哪个资源管理器(YARN、Mesos或Standalone)通信。它允许您配置Spark配置参数。通过SparkContext,驱动程序可以访问其他上下文,比如SQLContext、HiveContext和StreamingContext来编程Spark。然而,在Spark 2.0中,SparkSession可以通过一个统一的入口点访问上述Spark的所有功能。除了使访问DataFrame和Dataset api更简单外,它还包含底层上下文来操作数据。总之,我在这个博客中展示的是,Spark早期版本中以前通过SparkContext、SQLContext或HiveContext提供的所有功能现在都可以通过SparkSession获得。从本质上讲,SparkSession是使用Spark操作数据的单一统一入口点,从而最大限度地减少要记住或构造的概念的数量。因此,如果要处理的编程结构更少,则更可能犯更少的错误,代码也可能更少混乱。下一步是什么?这是关于Spark 2.0中引入的新特性和功能以及如何在Databricks just time数据平台上使用它们的系列文章中的第一篇。在接下来的几周里,请继续关注其他how-to博客。试试随附的SparkSessionZipsExample笔记本在我的github repo上尝试相应的Spark应用程序尝试另一个SparkSession笔记本今天在数据库里免费导入这些笔记本免费试用Databricks。今天就开始吧