云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

云数据库_数据库管理系统的组成_返现

小七 141 0

使用审计日志监视Databricks工作区

云计算已经从根本上改变了公司的运营方式,用户不再受制于内部硬件部署的限制,例如资源的物理限制和繁重的环境升级过程。随着云服务的便利性和灵活性,如何正确地监控用户如何利用这些方便可用的资源带来了挑战。如果不这样做,可能会导致有问题且代价高昂的反模式(既有云提供商的核心资源,也有类似PaaS的数据块)。Databricks在设计上是云本地的,因此与公共云提供商(如微软和亚马逊Web服务)紧密结合,充分利用了这种新的范例,审计日志功能为管理员提供了一种集中的方式来理解和管理平台上发生的活动。管理员可以使用Databricks审计日志来监视模式,例如在给定的一天中集群或作业的数量、执行这些操作的用户以及被拒绝进入工作区的任何用户。在本系列的第一篇博客文章Trust but Verify with Databricks中,我们介绍了Databricks管理员如何使用Databricks审计日志和其他云提供商日志作为其云监控场景的补充解决方案。各安全平台的管理员可以使用RICKS和安全平台中的各种资源来跟踪RICK的数据。在本文中,我们将详细介绍这些角色如何处理和分析审计日志,以跟踪资源使用情况并识别潜在代价高昂的反模式。ETL设计审核日志Databricks按照JSON格式的交付SLA将所有启用的工作区的审计日志交付到客户拥有的aws3 bucket。这些审核日志包含与主要资源(如群集、作业和工作区)相关的特定操作的事件。为了简化交付和客户的进一步分析,Databricks将每个操作的每个事件记录为单独的记录,并将所有相关参数存储到名为requestParams的稀疏结构类型中。为了使这些信息更容易访问,我们建议使用基于结构化流和Delta-Lake的ETL过程。利用结构化流媒体,我们可以:将状态管理留给专门为状态管理而构建的构造。我们不必考虑自上一次运行以来经过了多少时间来确保我们只添加了正确的记录,我们可以利用结构化流的检查点和预写日志来确保我们只处理新添加的审核日志文件。我们可以将流式查询设计为类似于伪批处理作业的triggerone日常作业利用三角洲湖,我们可以做到以下几点:优雅地处理模式演化,特别是针对requestParams字段,该字段可能具有基于审计日志中跟踪的新操作的新StructField轻松利用表到表流利用特定的性能优化(如优化)来最大限度地提高读取性能作为参考,这是Databricks推荐的medallion参考架构:青铜:管道的初始着陆区。我们建议复制尽可能接近其原始形式的数据,以便在需要时从一开始就轻松地重放整个管道Silver:原始数据被清理(考虑数据质量检查),被转换并可能被外部数据集丰富黄金:整个公司都可以依赖的生产级数据进行商业智能、描述性统计和数据科学/机器学习遵循我们自己的medallion架构,我们对审计日志ETL设计进行了如下划分:原始数据到青铜表Databricks使用基于文件的结构化流将原始JSON文件流传输到青铜Delta-Lake表。这将创建一个原始数据的持久副本,以便我们在下游表中发现任何问题时重放ETL。Databricks以JSON的形式向客户指定的aws3 bucket交付审计日志。与其编写逻辑来确定Delta-Lake表的状态,不如利用结构化流的预写日志和检查点来维护表的状态。在本例中,我们将ETL设计为每天运行一次,因此我们使用triggerOnce的文件源来模拟流式框架的批处理工作负载。由于结构化流需要我们显式地定义模式,所以我们将读取原始JSON文件一次来构建它。流模式=spark.read.json(sourceBucket).schema然后,我们将使用我们推断的模式和原始审计日志的路径实例化StreamReader。流DF=(火花.读流.format("json").schema(streamSchema).load(sourceBucket))然后,我们实例化StreamWriter,并将原始审计日志写到一个按日期分区的青铜Delta-Lake表中。(streamDF).writeStream公司.格式("delta").partitionBy("日期").outputMode("append").option("checkpointLocation","{}/checkpoints/bronze".format(sinkBucket)).option("path","{}/streaming/browner".format(sinkBucket)).option("mergeSchema",True).trigger(一次=真)。开始())现在我们已经在aws3 bucket上创建了表,我们需要将该表注册到Databricks配置单元metastore,以便最终用户更容易地访问数据。在创建青铜表之前,我们将创建逻辑数据库审计日志。创建数据库(如果不存在审核日志)火花.sql("""创建表(如果不存在)审核_原木.青铜色使用DELTA位置{}/streaming/bronze'"."格式(sinkBucket))如果您以批处理或伪批处理的方式更新Delta-Lake表,最佳做法是在更新后立即运行OPTIMIZE。优化审核_原木.青铜色青铜色到银色的桌子从bronze Delta Lake表流到silver Delta Lake表,以便它采用稀疏的requestParams StructType并剥离每个记录的所有空键,同时执行其他一些基本转换,如从嵌套字段解析电子邮件地址和将UNIX epoch解析为UTC时间戳。由于我们以通用JSON格式发布所有Databricks资源类型的审计日志,所以我们定义了一个名为requestParams的规范结构,它包含所有资源类型的键的联合。最后,我们将为每个服务创建单独的表,因此我们希望剥离每个表的requestParams字段,以便它只包含资源类型的相关键。为了实现这一点,我们定义了一个用户定义函数(UDF),以除去requestParams中所有具有空值的键。def stripNulls(原始):返回json.dumps文件({i:原始.asDict()[我]为了我原始.asDict()如果原始.asDict()[我]!=无})strip_udf=udf(stripNulls,StringType())我们从青铜三角洲湖表中实例化了一个StreamReader:布朗泽夫=(火花.读流.load("{}/streaming/bronze".format(sinkBucket)))然后,我们将以下转换应用于来自bronze Delta Lake表的流数据:从requestParams中剥离空键并将输出存储为字符串分析来自userIdentity的电子邮件从timestamp字段解析实际的timestamp/timestamp数据类型,并将其存储在date_time中删除原始的requestParams和userIdentity查询=(布朗泽夫.withColumn("扁平化",strip_udf("requestParams")).withColumn("电子邮件",列("用户标识.email")).withColumn("date_time",from_utc_timestamp(from_unixtime(col("timestamp")/1000,"utc")).drop("请求参数").drop("用户标识"))然后,我们将这些转换后的记录流到银三角洲湖表中:(查询.writeStream公司.格式("delta").partitionBy("日期").outputMode("append").option("checkpointLocation","{}/checkpoints/silver".format(sinkBucket)).option("path","{}/streaming/silver".format(sinkBucket))选项("True.schema",合并).trigger(一次=真)。开始())同样,由于我们已经创建了一个基于aws3 bucket的表,所以我们需要在vive Metastore中注册它,以便于访问。火花.sql("""创建表(如果不存在)审核_原木.银使用DELTA位置{}/streaming/silver'"."格式(sinkBucket))尽管结构化流可以保证只处理一次,但我们仍然可以添加一个断言来检查青铜三角洲湖表和银三角洲湖表的计数。断言(火花表("审计_原木.青铜色").count()==火花表("审计_原木.银").count())对于前面的青铜表,我们也将在silver表的更新之后运行OPTIMIZE。优化审核_原木.银银桌到金桌流到审计日志中跟踪的每个Databricks服务的各个gold Delta Lake表goldaudit日志表是Databricks管理员用来进行分析的。随着requestParams字段在服务级别上的缩减,现在更容易处理分析和相关内容。由于Delta Lake能够优雅地处理模式演化,随着Databricks跟踪每种资源类型的附加操作,gold表将无缝更改,无需硬编码模式或照看错误。在ETL过程的最后一步,我们首先定义一个UDF来解析原始requestParams字段的精简版本中的键。def justKeys(字符串):返回[我为我在json.loads(字符串).keys()]just_keys_udf=udf(justKeys,StringType())对于ETL的下一大块,我们将定义一个函数,该函数实现以下功能:收集给定serviceName(资源类型)的每个记录的密钥创建一组这些键(以删除重复项)从这些键创建一个模式来应用于给定的serviceName(如果serviceName在requestParams中没有任何键,我们给它一个名为placeholder的键架构)在silver Delta Lake表中为每个服务名称写出单独的gold Delta Lake表def flattetable(服务名称,bucketName):平坦流=spark.readStream.load("{}/streaming/silver".f)