首页 > Spark SQL 优化笔记

Spark SQL 优化笔记

互联网 2021-04-21 13:30:55

我的原创地址:https://dongkelun.com/2018/12/26/sparkSqlOptimize/

前言

记录自己在工作开发中遇到的SQL优化问题

1、避免用in 和 not in解决方案:用exists 和 not exists代替用join代替not exists示例

not in:

select stepId,province_code,polyline from route_step where stepId not in (select stepId from stepIds)

not exists:

select stepId,province_code,polyline from route_step where road!='解析异常' andnot exists (select stepId from stepIds where route_step.stepId = stepIds.stepId)自己遇到的问题

上面not in会抛出异常

18/12/26 11:20:26 WARN TaskSetManager: Stage 3 contains a task of very large size (17358 KB). The maximum recommended task size is 100 KB.Exception in thread "dispatcher-event-loop-11" java.lang.OutOfMemoryError: Java heap space

首先会导致某个task数量很大,且总task数量很少(task数目不等于rdd或df的分区数,目前不知道原因),接着报java.lang.OutOfMemoryError,试了很多方法,最后用not exists,没有上面的异常

效率

not in慢的原因是 not in不走索引

疑问:not in是非相关子查询,not exists是相关子查询,而从理论上来说非相关子查询比相关子查询效率高(看下面的参考),但是这里却相反,矛盾,不知道为啥~

参考博客:[笔记] SQL性能优化 - 避免使用 IN 和 NOT INSQL优化——避免使用Not IN嵌套查询:相关子查询和非相关子查询2、in 会导致数据倾斜

longitudeAndLatitudes和lineIds都有160个分区,且数据平衡(每个分区的数目差不多),但是下面的语句则有问题

select * from longitudeAndLatitudes where lineIdin (select lineId from lineIds)

虽然分区数还是160,但是只有两三个分区有数,其他分区的数量都为0,这样就导致数据倾斜,程序执行很慢,如果非要用in的话,那么需要repartition一下

3、大表join小表

策略:将小表广播(broadcast)参数:spark.sql.autoBroadcastJoinThreshold 默认值10485760(10M),当小表或df的大小小于此值,Spark会自动的将该表广播到每个节点上原理:join是个shuffle类算子,shuffle时,各个节点上会先将相同的key写到本地磁盘,之后再通过网络传输从其他节点的磁盘文件在拉取相同的key,因此shuffle可能会发生大量的磁盘IO和网络传输,性能很低,而broadcast先将小表广播到每个节点,这样join时都是在本地完成,不需要网络传输,所以会提升性能

注意:broadcast join 也称为replicated join 或者 map-side join具体操作

提交代码时适当调大阈值,如将阈值修改为100M,具体看自己环境的内存限制和小表的大小

--conf spark.sql.autoBroadcastJoinThreshold=104857600

如何看是否进行了broadcast join:以df为例(df是join之后的结果)

df.explain

如果为broadcast join,则打印:

== Physical Plan ==*(14) Project [lineId#81, stepIds#85, userId#1, freq#2]+- *(14) BroadcastHashJoin [lineId#81], [lineId#42], Inner, BuildLeft...

能看到关键字BroadcastHashJoin即可,否则打印:

== Physical Plan ==*(17) Project [lineId#42, stepIds#85, freq#2, userId#1]+- *(17) SortMergeJoin [lineId#42], [lineId#81], Inner...

能看到SortMergeJoin即可

查看阈值:

val threshold =spark.conf.get("spark.sql.autoBroadcastJoinThreshold").toIntthreshold / 1024 / 1024参考SparkSQL之broadcast joinSpark-sql Join优化=>(cache+BroadCast)4、写MySQL慢

Spark df批量写MySQL很慢,如我900万条数据写需要5-10个小时解决办法:在url后面加上

&rewriteBatchedStatements=true

加上之后,写数据10分钟左右,快很多。

个人环境经验:MySQL不用加就没问题,MariaDB需要加,也就是不同的MySQL版本不一样

5、run at ThreadPoolExecutor.java:1149

之前就在Spark Web UI经常看到这个描述,但不知道是干啥,现在在总结上面的broadcast join发现了规律:当两个表join,如果为BroadcastHashJoin则有这个描述,如果为SortMergeJoin则没有。BroadcastHashJoin 用ThreadPool进行异步广播 源码见:BroadcastHashJoinExec和BroadcastExchangeExec参考:What are ThreadPoolExecutors jobs in web UI's Spark Jobs?

免责声明:非本网注明原创的信息,皆为程序自动获取互联网,目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责;如此页面有侵犯到您的权益,请给站长发送邮件,并提供相关证明(版权证明、身份证正反面、侵权链接),站长将在收到邮件12小时内删除。

一周热门

查看更多