Using PySpark to demonstrate spark Transformation and actions on RDDs and stages/DAG evaluation-Part1

Through the experiment, we will use following connections information:

logger.LogManager.getLogger("org"). setLevel( logger.Level.INFO )

file: connections.csv


Q1. Find the Sum of  Bytes sent by each srcIp?


filtered_rdd = sc.textFile("file:///data/connections.csv").filter(lambda x: x!=headers and x.strip())

//Tranformations -> filter , No of Stages->1

for i in x: (x.split(",")[0], int(x.split(",")[-1]))).reduceByKey(lambda x, y:x+y).collect(): print("IP:{}, Bytes:{}".format(i[0],i[1]))
//Transformations -> map, reduceByKey Actions -> collect()

IP:, Bytes:8000
IP:, Bytes:6000
IP:, Bytes:5000
IP:, Bytes:7000
IP:, Bytes:7000
IP:, Bytes:3000

No Of Stages: 3; 1 for all Narrow Transformations (map and hadoopRDD) , 1 for wide transformation(reduceByKey) and 1 collect 

What is interesting to note is that in stage-1 and stage-2 we have shuffle read and shuffle write respectively and hence they are wide transformations and separate stages apart from stage-0.

Q2. How can you sort the results of above ?

for i in x: (x.split(",")[0], int(x.split(",")[-1]))).reduceByKey(lambda x, y:x+y).sortBy(lambda x: x[1],ascending=False).collect(): print("IP:{}, Bytes:{}".format(i[0],i[1]))
IP:, Bytes:8000
IP:, Bytes:7000
IP:, Bytes:7000
IP:, Bytes:6000
IP:, Bytes:5000
IP:, Bytes:3000

//Transformations ->  map, filter, reduceByKey, sortBy; Actions -> collect()
//No of Stages : 3

Q3. Find the Average Bytes sent by each srcIp?

for i in x: (x.split(",")[0], (int(x.split(",")[-1]), 1))).reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1])).mapValues(lambda x: x[0]/x[1]).collect(): print("IP:{}, Bytes:{}".format(i[0],i[1]))
IP:, Bytes:8000
IP:, Bytes:6000
IP:, Bytes:5000
IP:, Bytes:7000
IP:, Bytes:3500
IP:, Bytes:1500

//Tranformations ->  filter, reduceByKey, mapValues; Actions -> collect()
//No of Stages: 3

Q.4 Find the Top 1 Interaction-> ?

for i in x: ((x.split(",")[0],x.split(",")[1]),int(x.split(",")[-1])) ).map(lambda x: (tuple(sorted(x[0])),x[1])).reduceByKey(lambda x , y : x + y).sortBy(lambda x: x[1],ascending=False).take(1): print("Top Interactions {} <-> {} : {}".format(i[0][0],i[0][1],i[1]))

//Top Interactions <-> : 8000
