云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

分布式数据库_华为云服务产品_企业级

小七 141 0

利用AWS-Kinesis、RDS和Databricks优化油气资产

在数据库里试试这个笔记本成功的关键是始终如一地做出好的决定,而做出好决定的关键是掌握好信息。这种信念是大数据爆炸性兴趣背后的主要推动力。我们都凭直觉知道,获取更多数据有可能获得更好的数据,从而做出更好的决策,但更多的数据本身并不一定会导致更好的决策。我们还必须筛选数据,发现好的信息。在资本密集型行业,有效地做到这一点尤为重要。石油和天然气行业是一个资产密集型企业,其资本资产包括从钻机、海上平台和油井到管道、液化天然气接收站和炼油厂(图1)。这些资产的设计、建造、运营和维护成本高昂。对五大石油巨头(英国石油、康菲石油、埃克森美孚、壳牌、道达尔)财务报表的分析显示,厂房、物业和设备平均占总资产的51%。有效地管理这些资产需要石油和天然气行业利用先进的机器学习和分析能力,对超大容量的数据进行批量和实时分析。apachespark是处理此类工作负载的理想选择,Databricks是构建apachespark解决方案的理想平台。在本博客中,我们将解决油气行业的一个典型问题——资产优化。我们将演示包含三个部分的解决方案:AWS-Kinesis流式实时数据;AWS RDS存储我们的历史数据;数据块处理来自RDS和Kinesis的数据,以确定最佳资产水平。资产优化背景"资产"是指企业为创收而使用的有形物品——原材料、设备等。企业经营活动消耗资产(即磨损一件设备),必须对其进行补充才能继续创收。对补充时间和数量的估计是资产优化的核心,因为错误是代价高昂的:如果企业耗尽原材料,收入将停止流动,而过量的库存会导致持有成本。理想情况下,资产优化基于对近实时消费数据的分析,准确确定正确的资产水平。我们的目标是精确地估计在订单到达所需的时间内有多少库存将被使用。资产优化示例在资本密集型的石油和天然气行业,每一小时的低效资产运营或计划外停机都要耗费数百万美元。在当前物联网(IoT)大数据时代,资产优化的重点是持续监控资产的关键运行特征,并应用先进的机器学习来最大限度地提高资产性能,最大限度地减少计划外停机。这就是大数据和先进分析技术的用武之地。在博客的其余部分,我们将看一个发电厂的例子,在这个例子中,我们实时监控资产计量表,并对关键测量值建模,以确定资产是否处于最佳运行状态。我们通过将一个分布拟合到我们所拥有的有限的交货期数据,然后从该分布中取样,从而对此进行建模。拟合分布是最慢的部分,因为必须使用马尔可夫链蒙特卡罗(MCMC)进行数值拟合,对于我们的资产,这需要100000次迭代的循环,而这不能并行进行。整个过程必须针对数据集中的每种材料进行,这取决于工厂,这可能是3000+。每种材料都可以独立并并行地进行分析。让我们看看如何将AWS Kinesis、RDS和Databricks一起使用(注意:您还可以在Databricks笔记本中看到示例代码)。流式传感器读数与AWS-Kinesis第一步:导入正确的Kinesis库。本例假设Spark 2.0.1(Scala 2.11)。在这个特别的笔记本中,确保您已经为您的集群和相应的kinesis客户端库附加了Maven依赖项spark streaming kinesis。第二步:配置你的动态流。//===控制应用程序流的配置===val stopActiveContext=真//"true"=如果任何现有的StreamingContext正在运行,则停止;//"false"=不要停止,让它不受干扰地运行,但您的最新代码可能不会被使用//===火花流的配置===val batchIntervalSeconds=10val eventsPerSecond=1000//对于虚拟源//确认所连接的火花组为1.4.0+要求(sc.版本.替换(".","").toInt>=140)步骤3:定义使用流的函数。此函数使用我们为演示动态而创建的虚拟流。我们使用后者的数据作为JSON文件暂存。进口scala.util.随机进口org.apache.spark网站.流媒体.receiver._类DummySource(ratePerSec:Int)扩展Receiver[String](StorageLevel.MEMORY_和_磁盘_2) {定义onStart(){//启动通过连接接收数据的线程新线程("虚拟源"){重写def run(){receive()}}。开始()}定义onStop(){//与调用receive()的线程相比,没有什么可做的//被设计为自行停止isstop()返回false}/**创建一个套接字连接并接收数据,直到接收器停止*/私有定义接收(){同时(!isStopped()){存储("我是一个虚拟源"+随机.nextInt(10) )线程睡眠(1000.toDouble/ratePerSec.toInt)}}}在AWS RDS中存储历史数据让我们连接到一个关系数据库来查看我们的主数据,并选择要为其创建第一个模型的发电厂。我们将使用Redshift作为我们的数据库,但是连接到任何数据库的步骤基本相同。对于我们的模拟,Redshift是存储有关资产的主数据的地方。在现实世界中,这些数据可以存储在任何关系数据库中。步骤1:从整个Redshift表创建一个DataFrame步骤2:创建临时视图第三步:选择并查看发电厂列表我们可以使用ansisql来研究我们的主数据,并决定我们希望在初始分析中使用什么资产。用数据链进行监测和异常检测第一步:加载数据从分段JSON数据中获取度量数据。在现实世界中,这将直接来源于Kinesis或其他流媒体技术,正如我在上面的虚拟示例中所展示的那样。从JSON文件加载分段数据:安装列表=[{'bucket':'databricks-corp-training/structured_streaming/devices','mount_folder':'/mnt/sdevices'}]对于装载列表中的装载点:bucket=安装点['bucket']mount_folder=挂载点['mount_folder']尝试:dbutils.fs.ls(mount_文件夹)dbutils.fs.unmount(mount_文件夹)除了:通过最后:#如果MOUNT_文件夹不存在dbutils.fs.安装("s3a://"+ACCESSY_KEY_ID+":"+SECRET_ACCESS_KEY+"@"+bucket,mount_文件夹)为JSON设备数据定义一个模式,这样Spark就不必推断它了:进口org.apache.spark网站.sql.types._//获取上传到文件存储的JSON设备信息val jsonFile="dbfs:/mnt/sdevices/"val jsonSchema=新结构类型().add("电池电量",LongType).add("c02_级别",长型).add("cca3",StringType).add("cn",StringType).add("设备标识",LongType).add("设备类型",StringType).add("信号",长型).add("ip",StringType).add("temp",长型).add("时间戳",时间类型)使用指定的架构从挂载目录读取JSON文件。提供模式可避免Spark推断模式,从而使读取操作更快:val devicesDF=火花.阅读.schema(jsonSchema).json(jsonFile)第二步:让我们探索我们的数据显示(设备数据框)第3步:可视化我们的数据//导入一些SQL聚合和窗口函数进口org.apache.spark网站.sql.函数._val staticCountsDF=设备数据框。选择("设备类型","电池电量")。式中("信号