Using PySpark to demonstrate spark Transformation and actions on RDDs and stages/DAG evaluation-Part3(-How to avoid Shuffle Joins in PySpark)

In this part, lets demonstrate the spark transformations and actions on Multiple RDDs using pySpark.

Lets consider two datasets as:


dataset1=["10.1.0.1,1000","10.1.0.2,2000","10.1.0.3,3000"]*1000
dataset2=["10.1.0.1,hostname1","10.1.0.2,hostname2","10.1.0.3,hostname3"]

I wish to get the final o/p as following by aggregating the Bytes on 1st dataset incase of duplicate records.
10.1.0.1,hostname1,100000

Approach-1:


1st aggregate the dataset1 by IP address, save as  RDD , next create an RDD from 2nd dataset and apply join on two RDDs.


dataset1_rdd=sc.parallelize(dataset1).map(lambda x: x.split(",")).mapValues(lambda x: int(x)).reduceByKey(lambda x, y:x+y)

dataset2_rdd=sc.parallelize(dataset2).map(lambda x: tuple(x.split(",")))

for i in dataset1_rdd.join(dataset2_rdd).map(lambda x:str(x[0])+","+str(x[1][1])+","+str(x[1][0])).collect(): print i

10.1.0.1,hostname1,1000000
10.1.0.2,hostname2,2000000
10.1.0.3,hostname3,3000000

Now lets do stage/DAG analysis.


Total Number of stages: 3


Stage-0 Breakups :

Stage-1 Breakups :


Stage-2 Breakups :




As seen above, There is lots of shuffling in Join stage. We can reduce same by broadcasting the second RDD. Hence lets take a look at second approach as following


Approach2:


dataset1=["10.1.0.1,1000","10.1.0.2,2000","10.1.0.3,3000"]*1000
dataset2=["10.1.0.1,hostname1","10.1.0.2,hostname2","10.1.0.3,hostname3"]

dataset1_rdd=sc.parallelize(dataset1).map(lambda x: x.split(",")).mapValues(lambda x: int(x)).reduceByKey(lambda x, y:x+y)

dataset2_rdd=sc.parallelize(dataset2).map(lambda x: tuple(x.split(","))).collectAsMap()
dataset2_bd=sc.broadcast(dataset2_rdd)

for i in dataset1_rdd.map(lambda x: (x,dataset2_bd.value.get(x[0]))).map(lambda x: x[0][0]+","+x[1]+","+str(x[0][1]) ).collect(): print i

10.1.0.1,hostname1,1000000
10.1.0.2,hostname2,2000000
10.1.0.3,hostname3,3000000


Now Lets Analyse the stages/DAG:

Total No of Stages: 3



Breakups for stage-0:


Breakups for stage-1:


Breakups for stage-2:



Comments