Using PySpark to demonstrate spark Transformation and actions on RDDs and stages/DAG evaluation-Part3(-How to avoid Shuffle Joins in PySpark)
In this part, lets demonstrate the spark transformations and actions on Multiple RDDs using pySpark.
Lets consider two datasets as:
dataset1=["10.1.0.1,1000","10.1.0.2,2000","10.1.0.3,3000"]*1000
dataset2=["10.1.0.1,hostname1","10.1.0.2,hostname2","10.1.0.3,hostname3"]
I wish to get the final o/p as following by aggregating the Bytes on 1st dataset incase of duplicate records.
10.1.0.1,hostname1,100000
1st aggregate the dataset1 by IP address, save as RDD , next create an RDD from 2nd dataset and apply join on two RDDs.
dataset1_rdd=sc.parallelize(dataset1).map(lambda x: x.split(",")).mapValues(lambda x: int(x)).reduceByKey(lambda x, y:x+y)
dataset2_rdd=sc.parallelize(dataset2).map(lambda x: tuple(x.split(",")))
for i in dataset1_rdd.join(dataset2_rdd).map(lambda x:str(x[0])+","+str(x[1][1])+","+str(x[1][0])).collect(): print i
10.1.0.1,hostname1,1000000
10.1.0.2,hostname2,2000000
10.1.0.3,hostname3,3000000
Now lets do stage/DAG analysis.
Total Number of stages: 3
Stage-0 Breakups :
Stage-1 Breakups :
Stage-2 Breakups :
As seen above, There is lots of shuffling in Join stage. We can reduce same by broadcasting the second RDD. Hence lets take a look at second approach as following
dataset1=["10.1.0.1,1000","10.1.0.2,2000","10.1.0.3,3000"]*1000
dataset2=["10.1.0.1,hostname1","10.1.0.2,hostname2","10.1.0.3,hostname3"]
dataset1_rdd=sc.parallelize(dataset1).map(lambda x: x.split(",")).mapValues(lambda x: int(x)).reduceByKey(lambda x, y:x+y)
dataset2_rdd=sc.parallelize(dataset2).map(lambda x: tuple(x.split(","))).collectAsMap()
dataset2_bd=sc.broadcast(dataset2_rdd)
for i in dataset1_rdd.map(lambda x: (x,dataset2_bd.value.get(x[0]))).map(lambda x: x[0][0]+","+x[1]+","+str(x[0][1]) ).collect(): print i
10.1.0.1,hostname1,1000000
10.1.0.2,hostname2,2000000
10.1.0.3,hostname3,3000000
Now Lets Analyse the stages/DAG:
Total No of Stages: 3
Breakups for stage-0:
Breakups for stage-1:
Breakups for stage-2:
Lets consider two datasets as:
dataset1=["10.1.0.1,1000","10.1.0.2,2000","10.1.0.3,3000"]*1000
dataset2=["10.1.0.1,hostname1","10.1.0.2,hostname2","10.1.0.3,hostname3"]
I wish to get the final o/p as following by aggregating the Bytes on 1st dataset incase of duplicate records.
10.1.0.1,hostname1,100000
Approach-1:
1st aggregate the dataset1 by IP address, save as RDD , next create an RDD from 2nd dataset and apply join on two RDDs.
dataset1_rdd=sc.parallelize(dataset1).map(lambda x: x.split(",")).mapValues(lambda x: int(x)).reduceByKey(lambda x, y:x+y)
dataset2_rdd=sc.parallelize(dataset2).map(lambda x: tuple(x.split(",")))
for i in dataset1_rdd.join(dataset2_rdd).map(lambda x:str(x[0])+","+str(x[1][1])+","+str(x[1][0])).collect(): print i
10.1.0.1,hostname1,1000000
10.1.0.2,hostname2,2000000
10.1.0.3,hostname3,3000000
Now lets do stage/DAG analysis.
Total Number of stages: 3
Stage-0 Breakups :
Stage-1 Breakups :
Stage-2 Breakups :
As seen above, There is lots of shuffling in Join stage. We can reduce same by broadcasting the second RDD. Hence lets take a look at second approach as following
Approach2:
dataset2=["10.1.0.1,hostname1","10.1.0.2,hostname2","10.1.0.3,hostname3"]
dataset1_rdd=sc.parallelize(dataset1).map(lambda x: x.split(",")).mapValues(lambda x: int(x)).reduceByKey(lambda x, y:x+y)
dataset2_rdd=sc.parallelize(dataset2).map(lambda x: tuple(x.split(","))).collectAsMap()
dataset2_bd=sc.broadcast(dataset2_rdd)
for i in dataset1_rdd.map(lambda x: (x,dataset2_bd.value.get(x[0]))).map(lambda x: x[0][0]+","+x[1]+","+str(x[0][1]) ).collect(): print i
10.1.0.1,hostname1,1000000
10.1.0.2,hostname2,2000000
10.1.0.3,hostname3,3000000
Now Lets Analyse the stages/DAG:
Total No of Stages: 3
Breakups for stage-0:
Breakups for stage-1:
Breakups for stage-2:
Comments