百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 文章教程 > 正文

利用SparkSQL Logical Plan Parse 打造大数据平台SQL诊断利器

xsobi 2024-12-03 19:16 1 浏览

前言

对一个开源项目来说,虽然各种卷,动不动去深入研究源码啥的,但是没有真正去参与开发的话,了解里头的原理又少那么点感觉。实际情况来说很少机会去参与源码的改造吧,这里我提供一些思路,就是基于源码倒腾一些小工具,这样子有作用而且加深那些原理的理解! 利用我们的源码,打造一款SQL的扫描工具~~

原理篇

Spark被大家津津乐道的经典SQL解析流程

  1. Sql语句经过Antlr4解析,生成Unresolved Logical Plan
  2. analyzer与catalog进行绑定(catlog存储元数据),生成Logical Plan;
  3. optimizer对Logical Plan优化,生成Optimized LogicalPlan;
  4. SparkPlan将Optimized LogicalPlan转换成 Physical Plan;
  5. prepareForExecution()将 Physical Plan 转换成 executed Physical Plan;
  6. execute()执行可执行物理计划,得到RDD;

需求分析

当然,对一个数据分析平台来说,能够到提交SQL其实是比较后面的一步了,实际sql可能是下面的样子

/**
这里是注释
**/

set mapreduce.input.fileinputformat.split.maxsize=128000000;
set hive.exec.reducers.bytes.per.reducer=200000000;  --参数部分

insert overwrite table dwd_user_dd partition (dt='${dt}')
select id,name,age from user where dt='${bizdate}' ;

====step:1====

--后续一堆乱七八糟的

平台把这么一坨sql解析成可以执行的SQL的话需要多做一些事情:

  1. 把注释去掉
  2. 替换参数数据
  3. 执行参数
  4. 提交

因为提交SQL的时候也是需要申请Driver的资源的,如果有点语法错误什么的,资源紧张的情况下,需要等蛮久才执行出来报错。 这个其实是大问题,尤其是资源紧张这个事情,大部分同学都发现平台资源就没有不紧张的时候吧,顶多是300%变成100%的使用,也就个变化空间。我们的目标便是搞一个SQL扫描的小工具,在提交之前就检测出来,提升效率。

有问题的sql应该在一开始的的时候就禁止用户提交,举例说明:

规则 内容 禁止用户建库 禁用create database 禁止用户删库 禁止drop database 禁止用户建正式表 正式表由平台方统一建立,不允许create table xxx 禁止删除正式表 正式表不允许删除 drop table table xxx 禁止在SQL中指定队列 用户不允许指定队列,平台统一调控 不能用INSERT OVERWRITE写入临时库的table 临时表不允许长周期

类似的规则其实很多,比如说禁止全量扫描大表,之类的,这个是按照实际需求来。 可能有同学会很疑惑,

  1. 为什么建表这种事情都要控制。 一般来说正式的表要求流程规范,而且需要有比较完整地描述信息,那么直接的etl语句中不允许的 ,正式表的话有数仓的流程规范,类似dwd/dws这种取名规范,所以在实际的开发中也需要做限制。
  2. hive之类的不是有权限控制么,为什么还需要单独校验 几方面原因,首先上面提到了sql语句是不希望用户提交,所以这个环节还没到执行阶段呢,其次,类似数仓的规范,hive其实做不到的,集团内部的建表规范是个性化定制的,hive只能在一个用户权限范围内去禁止建表之类的,但是对于哪些表名禁止,这个就无能为力了

思路分析

正常的SQL解析是需要走文法那套的,Antlr4也是可以的,只是这样子又从入门到放弃了,利用SparkSQL自己的API就可以做,我们只需要把Master指向lcoal就可以

 public static SparkSession spark = SparkSession.builder()
            .appName("sql parser")
            .master("local[*]")
            .enableHiveSupport()
            .getOrCreate();
public static SessionState sessionState = spark.sessionState();

生成 Unresolved Logical Plan

Unresolved Logical Plan我们直接做转化就可以得到:

 public static LogicalPlan mkPlan(String sql) throws ParseException {
        return sessionState.sqlParser().parsePlan(sql);
    }

来个实际的例子 :

CREATE TABLE `student`(
  `stu_id` int, 
  `name` string, 
  `sex` string, 
  `age` int, 
 cls_id int )
 PARTITIONED BY ( 
  `dt` string COMMENT ' day partitioned')
  
CREATE TABLE `class_inf`(
  `cls_id` int, 
  `cls_name` string 
 )
 PARTITIONED BY ( 
  `dt` string COMMENT ' day partitioned')

生成表之后我们用前面的代码直接生成Unresolved Logical Plan,

create table tmp_user_info 
as 
select x.stu_id,x.name,y.cls_name from 
( select  cls_id,stu_id,name,sex,age from  student  where dt='20211211' ) x 
left join 
(select  cls_id,cls_name from  class_inf  where dt='20211211') y 
on x.cls_id=y.cls_id
  String querysql="create table tmp_user_info as select x.stu_id,x.name,y.cls_name from ( select  cls_id,stu_id,name,sex,age from  student  where dt='20211211' ) x left join (select  cls_id,cls_name from  class_inf  where dt='20211211') y on x.cls_id=y.cls_id ";
        SparkDriver.spark.sql("use csdn");
        SessionState sessionState = spark.sessionState();
        LogicalPlan plan=  sessionState.sqlParser().parsePlan(querysql);
        System.out.println(plan);

执行结果如下:

'CreateTable `tmp_user_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists
+- 'Project ['x.stu_id, 'x.name, 'y.cls_name]
   +- 'Join LeftOuter, ('x.cls_id = 'y.cls_id)
      :- 'SubqueryAlias x
      :  +- 'Project ['cls_id, 'stu_id, 'name, 'sex, 'age]
      :     +- 'Filter ('dt = 20211211)
      :        +- 'UnresolvedRelation `student`
      +- 'SubqueryAlias y
         +- 'Project ['cls_id, 'cls_name]
            +- 'Filter ('dt = 20211211)
               +- 'UnresolvedRelation `class_inf`

这个阶段,我们可以得到以下的信息:

  1. 如果sql语法本身有错误,那么这个sql也是不可执行的,可以直接帮我们校验出来
  2. 这个阶段其实已经可以拿到我们扫描到我们执行相关的信息,我们其实可以针对信息做一些规则就可以

生成 Analyzed Logical Plan

 LogicalPlan  analyzer= sessionState.executePlan(plan).analyzed();
        System.out.println(analyzer);

结果如下:

CreateHiveTableAsSelectCommand [Database:csdn}, TableName: tmp_user_info, InsertIntoHiveTable]
+- Project [stu_id#18, name#19, cls_name#25]
   +- Join LeftOuter, (cls_id#22 = cls_id#24)
      :- SubqueryAlias x
      :  +- Project [cls_id#22, stu_id#18, name#19, sex#20, age#21]
      :     +- Filter (dt#23 = 20211211)
      :        +- SubqueryAlias student
      :           +- HiveTableRelation `csdn`.`student`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [stu_id#18, name#19, sex#20, age#21, cls_id#22], [dt#23]
      +- SubqueryAlias y
         +- Project [cls_id#24, cls_name#25]
            +- Filter (dt#26 = 20211211)
               +- SubqueryAlias class_inf
                  +- HiveTableRelation `csdn`.`class_inf`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cls_id#24, cls_name#25], [dt#26]

未解析的计划解析一遍,就可以拿到进一步的执行信息。

遍历树

这个是控制台打印的结果,其实就是一颗逻辑解析树,我们直接画出来:

sql解析的结果本身也是树模型,那么我们怎么去扫描这棵树呢,这个时候得想到我们大学时候的 绿皮书了,"树的遍历深度优先搜索栈或者递归,广度优先搜索搞队列"~~

LogicalPlan 里面就是按照树状结构进行保存,我们引入一个队列,就可以把里面的节点全部看一遍,遍历三部曲,待访问节点入队列->访问队首节点->子节点再入队列,直到队列空:

 Queue<LogicalPlan> queue = new ArrayDeque<>();
        queue.offer(plan);

        while (!queue.isEmpty()) {
            //访问元素出队列
            LogicalPlan logicalPlan = queue.poll();
            System.out.println("logicalPlan->"+logicalPlan.getClass().getName());
            if (logicalPlan instanceof Project) {
                visitProject((Project) logicalPlan);
            }

            if(logicalPlan instanceof Join){
                visitJoin((Join) logicalPlan);
            }

            if(logicalPlan instanceof Filter){
                visitFilter((Filter) logicalPlan);
            }

            if(logicalPlan instanceof HiveTableRelation){
                visitScanHiveTable((HiveTableRelation) logicalPlan);
            }

            //子节点入队列
            List<LogicalPlan> childrenPlan = scala.collection.JavaConversions
                    .seqAsJavaList(logicalPlan.children());
            if (childrenPlan.size() != 0) {
                for (int i = 0; i < childrenPlan.size(); i++) {
                    queue.offer(childrenPlan.get(i));
                }
            }
        }

要注意的是LogicalPlan在定义的时候是一个抽象类来着,所以具体访问的时候我们需要根据类型转化,完成访问过程,LogicalPlan定义如下:

abstract class LogicalPlan
  extends QueryPlan[LogicalPlan]
  with AnalysisHelper
  with LogicalPlanStats
  with QueryPlanConstraints
  with Logging {

至于具体有哪些呢,我们可以用idea的子类工具查看一下

其实是在执行阶段的转换不一样的,因为我们并不需要全部都处理,暴力点的办法就直接打印出来,我在代码里面是有打印操作的,所以可以看到具体的实例:

logicalPlan->org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
logicalPlan->org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
logicalPlan->org.apache.spark.sql.catalyst.plans.logical.Project
logicalPlan->org.apache.spark.sql.catalyst.plans.logical.Project
logicalPlan->org.apache.spark.sql.catalyst.plans.logical.Filter
logicalPlan->org.apache.spark.sql.catalyst.plans.logical.Filter
logicalPlan->org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
logicalPlan->org.apache.spark.sql.catalyst.analysis.UnresolvedRelation

增加校验规则

当我们可以拿到树的解析信息的时候,我们便可以把规则加到上面了,各种子逻辑特定处理就好了,我这里做一个限制建表的规则,我们前面代码有碰到建表的时候会提供一个方法去处理,我们前提假设,表名里面不包含temp的就不允许建表:

具体的方法实现如下:

  if(logicalPlan instanceof CreateHiveTableAsSelectCommand){
                    CatalogTable table = ((CreateHiveTableAsSelectCommand) logicalPlan).tableDesc();
                    Option<String> db = table.identifier().database();
                    System.out.println("本次建表库名是:"+db.get());
                    System.out.println("本次建表表名是:"+table.qualifiedName());
                    if (db.nonEmpty() && !db.get().contains("temp")) {
                        String qualifiedName = table.qualifiedName();
                        System.out.println("表名"+qualifiedName+"不符合建表规范,请检查");
                    }
                    
            }

我们重新执行刚才的语句,输出效果如下:

logicalPlan->org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
本次建表库名是:csdn
本次建表表名是:csdn.tmp_user_info
表名csdn.tmp_user_info不符合建表规范,请检查

到这一步为止我们真正可以去做一个sql的校验工作了。

更加丰富的元数据扫描

因为Spark本来就需要利用这些元数据做一些优化,我们可以拿到的数据其实是非常丰富的,涉及表的数据格式,大小,join时候的类型,其实可以构建很丰富的规则的。另外一个场景来说,我们可以作为bad的sql的批量扫描处理,在公司内部是一个数据治理的利器。

Type: MANAGED
Provider: hive
Table Properties: [transient_lastDdlTime=1639296105]
Statistics: 9223372036854775807 bytes
Location: file:/D:/IdeaProject/spark-metrics/spark-warehouse/csdn.db/student
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Storage Properties: [serialization.format=1]
Partition Provider: Catalog
Partition Columns: [`dt`]
Schema: root
 |-- stu_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- cls_id: integer (nullable = true)
 |-- dt: string (nullable = true)
)

基于这些场景,我们扩展了类似数据格式的校验用于公司升级orc,统计热点数据扫描,平滑切换到alluxion的内存介质,还有出现了join 左右数据类型不一致导致的结果不正确等。

后记

  1. SQL解析的核心这个可以用于生产的,实际生产还需要加上对平台注释和参数的设置,整个因为需要批量扫描和在线校验,所以也是进行了接口处理。
  2. 这个工具还有一个背景是校验哪些些SQL可以从hive切换成SparkSQL来着,所以被校验之后的sql其实可以直接平滑的切换
  3. Presto、ClickHouse等一些多元化引擎的场景接入,其实在解析阶段还需要进行改造
  4. 更复杂的倾斜,参数等场景也是有落地,当然需求永远不会有完结的时候 最后附上一下生产上的

相关推荐

js向对象中添加元素(对象,数组) js对象里面添加元素

一、添加一个元素对象名["属性名"]=值(值:可以是一个值,可以是一个对象,也可以是一个数组)这样添加进去的元素,就是一个值或对象或数组...

JS小技巧,如何去重对象数组?(一)

大家好,关于数组对象去重的业务场景,想必大家都遇到过类似的需求吧,这对这样的需求你是怎么做的呢。下面我就先和大家分享下如果是基于对象的1个属性是怎么去重实现的。方法一:使用.filter()和....

「C/C++」之数组、vector对象和array对象的比较

数组学习过C语言的,对数组应该都不会陌生,于是这里就不再对数组进行展开介绍。模板类vector模板类vector类似于string,也是一种动态数组。能够在运行阶段设置vector对象的长度,可以在末...

如何用sessionStorage保存对象和数组

背景:在工作中,我将[{},{}]对象数组形式,存储到sessionStorage,然后ta变成了我看不懂的形式,然后我想取之用之,发现不可能了~记录这次深刻的教训。$clickCouponIndex...

JavaScript Array 对象 javascript的array对象

Array对象Array对象用于在变量中存储多个值:varcars=["Saab","Volvo","BMW"];第一个数组元素的索引值为0,第二个索引值为1,以此类推。更多有...

JavaScript中的数组Array(对象) js array数组

1:数组Array:-数组也是一个对象-数组也是用来存储数据的-和object不同,数组中可以存储一组有序的数据,-数组中存储的数据我们称其为元素(element)-数组中的每一个元素都有一...

数组和对象方法&amp;数组去重 数组去重的5种方法前端

列举一下JavaScript数组和对象有哪些原生方法?数组:arr.concat(arr1,arr2,arrn);--合并两个或多个数组。此方法不会修改原有数组,而是返回一个新数组...

C++ 类如何定义对象数组?初始化数组?linux C++第43讲

对象数组学过C语言的读者对数组的概念应该很熟悉了。数组的元素可以是int类型的变量,例如int...

ElasticSearch第六篇:复合数据类型-数组,对象

在ElasticSearch中,使用JSON结构来存储数据,一个Key/Value对是JSON的一个字段,而Value可以是基础数据类型,也可以是数组,文档(也叫对象),或文档数组,因此,每个JSON...

第58条:区分数组对象和类数组对象

示例设想有两个不同类的API。第一个是位向量:有序的位集合varbits=newBitVector;bits.enable(4);bits.enable([1,3,8,17]);b...

八皇后问题解法(Common Lisp实现)

如何才能在一张国际象棋的棋盘上摆上八个皇后而不致使她们互相威胁呢?这个著名的问题可以方便地通过一种树搜索方法来解决。首先,我们需要写一个函数来判断棋盘上的两个皇后是否互相威协。在国际象棋中,皇后可以沿...

visual lisp修改颜色的模板函数 怎么更改visual studio的配色

(defunBF-yansemokuai(tuyuanyanse/ss)...

用中望CAD加载LISP程序技巧 中望cad2015怎么加载燕秀

1、首先请加载lisp程序,加载方法如下:在菜单栏选择工具——加载应用程序——添加,选择lisp程序然后加载,然后选择添加到启动组。2、然后是添加自定义栏以及图标,方法如下(以...

图的深度优先搜索和广度优先搜索(Common Lisp实现)

为了便于描述,本文中的图指的是下图所示的无向图。搜索指:搜索从S到F的一条路径。若存在,则以表的形式返回路径;若不存在,则返回nil。...

两个有助于理解Common Lisp宏的例子

在Lisp中,函数和数据具有相同的形式。这是Lisp语言的一个重大特色。一个Lisp函数可以分析另一个Lisp函数;甚至可以和另一个Lisp函数组成一个整体,并加以利用。Lisp的宏,是实现上述特色的...