Spark join and types

Spark supports different join types as given below.

  • Inner Join.
  • Left / Left Outer Join.
  • Right / Right Outer Join.
  • Outer / Full Join.
  • Cross Join.
  • Left Anti Join.
  • Left Semi Join.
  • Self Join.

 By default the join type is inner if we do not provide the join type.

Join in spark is a narrow transformation if it is broadcast join otherwise it is wide transformation(shuffling of the data happened)

Create the student dataframe

student_list = [(1, 'smith', "banglore"),
                (2, 'mark', "hyderabad"),
               (3, 'holder', "mumbai")]

student_schema = StructType([
   StructField('id', IntegerType()),
   StructField('name', StringType()),
   StructField('address', StringType())
])
student_df = sparkSession.createDataFrame(student_list, student_schema)

Create the marks dataframe

marks_list = [(1, 'math', 80),
              (1, 'science', 85),
             (1, 'computer', 90),
             (2, 'math', 78),
             (2, 'science', 92),
             (2, 'computer', 85),
             (4, 'math', 71),
             (4, 'science', 75),
             (4, 'computer', 97)]

marks_schema = StructType([
   StructField('id', IntegerType()),
   StructField('subject', StringType()),
   StructField('marks', StringType())
])

marks_df = sparkSession.createDataFrame(marks_list, marks_schema)

Join student dataframe with marks dataframe:

join_df = student_df.join(marks_df, ['id'], 'inner')
join_df.show()

Inner join output: 

+---+-----+---------+--------+-----+
| id| name|  address| subject|marks|
+---+-----+---------+--------+-----+
|  1|smith| banglore|    math|   80|
|  1|smith| banglore| science|   85|
|  1|smith| banglore|computer|   90|
|  2| mark|hyderabad|    math|   80|
|  2| mark|hyderabad| science|   85|
|  2| mark|hyderabad|computer|   90|
+---+-----+---------+--------+-----+
join_df = student_df.join(marks_df, ['id'], 'left')
join_df.show()

Left join output: 

+---+------+---------+--------+-----+
| id|  name|  address| subject|marks|
+---+------+---------+--------+-----+
|  1| smith| banglore|computer|   90|
|  1| smith| banglore| science|   85|
|  1| smith| banglore|    math|   80|
|  2|  mark|hyderabad|computer|   85|
|  2|  mark|hyderabad| science|   92|
|  2|  mark|hyderabad|    math|   78|
|  3|holder|   mumbai|    NULL| NULL|
+---+------+---------+--------+-----+
join_df = student_df.join(marks_df, ['id'], 'right')
join_df.show()

Right join output:

+---+-----+---------+--------+-----+
| id| name|  address| subject|marks|
+---+-----+---------+--------+-----+
|  1|smith| banglore|    math|   80|
|  1|smith| banglore| science|   85|
|  1|smith| banglore|computer|   90|
|  2| mark|hyderabad|    math|   78|
|  2| mark|hyderabad| science|   92|
|  2| mark|hyderabad|computer|   85|
|  4| NULL|     NULL|    math|   71|
|  4| NULL|     NULL| science|   75|
|  4| NULL|     NULL|computer|   97|
+---+-----+---------+--------+-----+
crossJoinDf = student_df.crossJoin(marks_df)
crossJoinDf.show()

Output of cross join:

+---+------+---------+---+--------+-----+
| id|  name|  address| id| subject|marks|
+---+------+---------+---+--------+-----+
|  1| smith| banglore|  1|    math|   80|
|  1| smith| banglore|  1| science|   85|
|  1| smith| banglore|  1|computer|   90|
|  1| smith| banglore|  2|    math|   78|
|  1| smith| banglore|  2| science|   92|
|  1| smith| banglore|  2|computer|   85|
|  1| smith| banglore|  4|    math|   71|
|  1| smith| banglore|  4| science|   75|
|  1| smith| banglore|  4|computer|   97|
|  2|  mark|hyderabad|  1|    math|   80|
|  2|  mark|hyderabad|  1| science|   85|
|  2|  mark|hyderabad|  1|computer|   90|
|  2|  mark|hyderabad|  2|    math|   78|
|  2|  mark|hyderabad|  2| science|   92|
|  2|  mark|hyderabad|  2|computer|   85|
|  2|  mark|hyderabad|  4|    math|   71|
|  2|  mark|hyderabad|  4| science|   75|
|  2|  mark|hyderabad|  4|computer|   97|
|  3|holder|   mumbai|  1|    math|   80|
|  3|holder|   mumbai|  1| science|   85|
|  3|holder|   mumbai|  1|computer|   90|
|  3|holder|   mumbai|  2|    math|   78|
|  3|holder|   mumbai|  2| science|   92|
|  3|holder|   mumbai|  2|computer|   85|
|  3|holder|   mumbai|  4|    math|   71|
|  3|holder|   mumbai|  4| science|   75|
|  3|holder|   mumbai|  4|computer|   97|
+---+------+---------+---+--------+-----+

Spark Join Strategies

Broadcast hash Join: 

In the join, if one dataset is large and another is smaller enough that fits into memory,then spark optimizes the join and uses the broadcast join,By using broadcast join the shuffle operation does not happen.

By default the value of spark.sql.autoBroadcastJoinThreshold=10485760 (10 MB).We can increase or decrease the value based on our requirement .If we do not want to use broadcast join, set spark.sql.autoBroadcastJoinThreshold= -1. In this case spark does not use broadcast join,spark either uses shuffle hash join or Shuffle sort-merge join.

Shuffle Hash join: 

In Shuffle Hash join, spark join two large dataset with creating the hash value of keys in both the dataset and put the same key in one partition on a single node.This join does not need sorting of the data. This join is more expensive than Broadcast hash join.

spark.sql.join.preferSortMergeJoin=false (Default value is true)

spark.sql.autoBroadcastJoinThreshold= -1

Shuffle Sort Merge join: 

In this join, Both the dataset are going to be sorted and sent to the same key in a single node.

Leave a Reply