云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

大带宽_廊桥遗梦百度云_怎么买

小七 141 0

集成apacheflow和数据块:用apachespark构建ETL管道

这是一系列关于将数据块与常用软件包集成的博客之一。请参阅结尾处的"下一步是什么"部分来阅读本系列中的其他内容,其中包括如何为AWS Lambda、Kinesis等等。阿帕奇气流概述Airflow是一个以编程方式编写、计划和监视工作流的平台。它可以用于将工作流作为任务的有向无环图(DAG)来编写。Airflow计划程序在遵循指定的依赖关系的同时对一组工作线程执行任务。气流有两个关键概念:DAG描述如何运行工作流,而操作员通过定义工作流中的任务来确定实际完成的任务。操作符通常(但不总是)原子的,这意味着它们可以独立存在,不需要与任何其他操作符共享资源。Airflow是一个异构的工作流管理系统,可以在云端和内部部署多个系统。如果数据块是更大系统的一个组成部分,例如ETL或机器学习管道,气流可以用于调度和管理。Airflow已经可以与一些常用的系统一起工作,比如S3、MySQL或HTTP端点;还可以轻松地为其他系统扩展基本模块。本博客假设已经有气流启动并运行的实例。有关如何设置气流的读数,请参阅"参考"部分。如何使用气流与数据块Databricks restapi支持对Databricks的编程访问(而不是通过webui)。它可以自动创建和运行作业,生产数据流,等等。它还允许我们通过气流操作员将气流与数据链集成。Airflow为许多常见任务提供了运算符,您可以使用BashOperator和Sensor运算符来解决许多典型的ETL用例,例如触发每日ETL作业以在AWS S3或数据库中的行记录中发布更新。bash操作员BashOperator执行bash命令。它可用于通过Databricks API与Databricks集成,以启动预配置的Spark作业,例如:t0=BashOperator(task_id='dbjob',取决于过去=错误,bash_command='curl-xpost-u用户名:密码https://.cloud.databricks.com/api/2.0/jobs/run now-d \'{\"作业\uid\":}\'',dag=dag)您可以通过键入以下内容来测试此运算符:%气流测试教程dbjob 2016-10-01在上面的示例中,操作员在Databricks中启动一个作业,JSON加载是一个键/值(job_id和实际的作业编号)。注意:您也可以使用SimpleHTTPOperator来获得相同的结果,而不是在BashOperator中使用curl。传感器操作员传感器操作员一直运行,直到满足标准。示例包括:在S3存储桶(S3KeySensor)中登录的某个文件,或对某个端点(HttpSensor)的httpget请求;在每次重试之间设置正确的时间间隔非常重要,"poke峎u interval"。在ETL工作流中,有必要使用一个与持久层"挂钩"的Sensor操作符来执行推送通知。JSON文件到拼花处理示例下面是一个设置管道来处理JSON文件并每天使用Databricks将它们转换为parquet的示例。Airflow通过检测每日文件何时准备好进行处理,并设置"S3传感器"以检测每日作业的输出并发送最终电子邮件通知,从而协调此管道。管道设置:如上所示,该管道有五个步骤:输入S3传感器(检查_S3_是否有文件_S3)检查输入数据是否存在:s3:///输入气流/输入-*Databricks REST API(dbjob),basheoperator对Databricks进行restapi调用并动态传递文件输入和输出参数。为了说明本博客中的要点,我们使用下面的命令;对于您的工作负载,如果在flow Python配置文件中输入S3密钥是一个安全问题,那么有许多方法可以维护安全性:卷曲-X后-u:\https://.cloud.databricks.com/api/2.0/jobs/run-now\-d'{"作业编号":"笔记本电脑参数":{"inputPath":"s3a://:@/input/测试.json","outputPath":"s3a://:@/output/sample_parquet_data"}'上面是从气流调用的Databricks中的作业的屏幕截图。阅读Databricks作业文档了解更多信息。Databricks操作包括读取输入JSON文件并将其转换为parquet:val inputPath=getArgument("inputPath","test")val测试数据=sqlContext.read.json(输入路径)val outPath=getArgument("outputPath","test")testJsonData.write.format("parquet").mode("overwrite").save(outPath)输出S3传感器(检查输出数据是否存在):s3:///输出气流/样品_镶木地板_数据/\u成功Email Notification(Email_Notification),在作业成功时发送电子邮件提醒。下面是此气流管道的Python配置文件:从气流导入DAG从气流操作员导入BashOperator、S3KeySensor、EmailOperator从datetime导入datetime,timedelta今天日期=日期时间。今天()默认参数={"所有者":"气流","取决于过去":错误,"开始日期":今天,'电子邮件':['@databricks.com网站'],"失败时发送电子邮件":False,"在重试时发送电子邮件":False,"重试次数":1,"重试延迟":timedelta(分钟=5),}dag=dag('tutorial',default_args=default_args,schedule_interval='@once')输入传感器=S3KeySensor(task_id='check_s3_for_file_in_s3',bucket_key='input-FLOW/input-*',通配符_match=True,bucket_name='peyman-datapipeline',s3_conn_id='S3Connection',超时=18*60*60,波谷间隔=120,dag=dag)dbtask=BashOperator(task_id='dbjob',取决于过去=错误,bash_command='curl-X POST-u:https://demo.cloud.databricks.com/api/2.0/jobs/run-now-d\'{\"作业id\":,\"笔记本电脑参数\"":{\"输入路径\"":\":