Both Spark Core and SQL support fundamental types of joins, but the general difference between core Spark joins and Spark SQL joins is that the optimizer in core Spark isn’t able to re-order or pushdown filters, so we need to think about the ordering of operations when applying core Spark joins.
RDD Partition
In order to perform join on RDDs, the two RDDs should share a partitioner, otherwise, they will need to be shuffled.
Core Spark Join Type
There are 4 types of join functions defined on RDD, which are join (inner join), full outer join, left outer join and right outer join. These joins behave just like joins in normal SQL.
Pre-filter Before Join
The book gives an example that we have one RDD with data in the form (panda id, score) and another RDD with (panda id, address), and we want to figure out the highest score for each panda and send it to the corresponding address.
The first approach to do it is we just join two RDD on panda id and make reduce on panda id by comparing the values of the same key and keeping the large one.
The second approach is we first make reduce on the score RDD to get the highest score for each panda and then join it with address RDD.
The second approach will be much more efficient than the first one. Let’s assume for each panda, there will be k scores, then the size of shuffle we need to do in the first approach will be k times the size of shuffle in the second approach.
Execution plan
The default implementation of joins in core Spark is shuffled hash join (seems like it is wrong in the book since sort merge join is prefered in Spark 2 https://github.com/apache/spark/blob/v2.2.0/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L101), which partitions the second dataset with the same default partitioner as the first one so that the keys with the same hash value for both datasets are in the same partition.
This approach always works, but it can be more expensive than necessary because we may able to avoid shuffle if:
- Both RDDs have a known partitioner
- One of the datasets is small enough to fit in memory, so we can do a broadcast hash join
Speeding up joins by assigning a known partitioner
If we have any operation requires a shuffle before the join, we can add a hash partitioner of the other dataset to the dataset we perform the operation on. By doing that, we can prevent doing shuffle in the join.
In the previous example, we have one RDD with (panda id, score) and the other with (panda id, address). In the pre-filtering process for score RDD, we can add a hash partitioner of address RDD as an argument to make the result to be in the same partition as address RDD, then we won’t need to do shuffle for the join anymore.
Speeding up joins using a broadcast hash join
If one RDD is small enough to fit in memory, we can just broadcast it to each partition of the other RDD to perform join.
Spark SQL Joins
Unlike core Spark, the optimizer in Spark SQL is able to do some heavy lifting like pushdown and reorder to make join more efficient. However, since you don’t control the partitioners, you may not be able to manually avoid shuffles as you do with core Spark joins.