Posts

Showing posts from March, 2018

Using PySpark to demonstrate spark Transformation and actions on RDDs and stages/DAG evaluation-Part3(-How to avoid Shuffle Joins in PySpark)

Image
In this part, lets demonstrate the spark transformations and actions on Multiple RDDs using pySpark. Lets consider two datasets as: dataset1=["10.1.0.1,1000","10.1.0.2,2000","10.1.0.3,3000"]*1000 dataset2=["10.1.0.1,hostname1","10.1.0.2,hostname2","10.1.0.3,hostname3"] I wish to get the final o/p as following by aggregating the Bytes on 1st dataset incase of duplicate records. 10.1.0.1,hostname1,100000 Approach-1: 1st aggregate the dataset1 by IP address, save as  RDD , next create an RDD from 2nd dataset and apply join on two RDDs. dataset1_rdd=sc.parallelize(dataset1).map(lambda x: x.split(",")).mapValues(lambda x: int(x)).reduceByKey(lambda x, y:x+y) dataset2_rdd=sc.parallelize(dataset2).map(lambda x: tuple(x.split(","))) for i in dataset1_rdd.join(dataset2_rdd).map(lambda x:str(x[0])+","+str(x[1][1])+","+str(x[1][0])).collect(): print i 10.1.0.1,hostname1,1000

Using PySpark to demonstrate spark Transformation and actions on RDDs and stages/DAG evaluation-Part2

Image
In this part, we consider the unstructured texts to demonstrate the spark transformations and action using pySpark. file:///data/text.csv rdd=sc.textFile("file:///data/text.csv") myrdd.take(2) [u'Think of it for a moment \u2013 1 Qunitillion = 1 Million Billion! Can you imagine how many drives / CDs / Blue-ray DVDs would be required to store them? It is difficult to imagine this scale of data generation even as a data science professional. While this pace of data generation is very exciting,  it has created entirely new set of challenges and has forced us to find new ways to handle Big Huge data effectively.', u''] Q1: Convert all words in a rdd to lowercase and split the lines of a document using space. myrdd.map(lambda x: x.lower().split()).take(2) [[u'think', u'of', u'it', u'for', u'a', u'moment', u'\u2013', u'1', u'qunitillion', u'=', u'1', u'milli

Using PySpark to demonstrate spark Transformation and actions on RDDs and stages/DAG evaluation-Part1

Image
Through the experiment, we will use following connections information: /usr/bin/pyspark logger = sc._jvm.org.apache.log4j logger.LogManager.getLogger("org"). setLevel( logger.Level.INFO ) file: connections.csv srcIp,dstIp,srcPort,dstPort,protocol,bytes 1.1.1.1,10.10.10.10,11111,22,tcp,1000 1.1.1.1,10.10.10.10,22222,69,udp,2000 2.2.2.2,20.20.20.20,33333,21,tcp,3000 2.2.2.2,30.30.30.30,44444,69,udp,4000 3.3.3.3,30.30.30.30,44444,22,tcp,5000 4.4.4.4,40.40.40.40,55555,25,tcp,6000 5.5.5.5,50.50.50.50,66666,161,udp,7000 6.6.6.6,60.60.60.60,77777,162,tcp,8000 Q1. Find the Sum of  Bytes sent by each srcIp? headers=sc.textFile("file:///data/connections.csv").first() filtered_rdd = sc.textFile("file:///data/connections.csv").filter(lambda x: x!=headers and x.strip()) //Tranformations -> filter , No of Stages->1 for i in filtered_rdd.map(lambda x: (x.split(",")[0], int(x.split(",")[-1]))).reduceByKey(lambda x

PyTest :: Python Testing framework for unit testing, system testing , integration testing everything ....

PyTest Rough Points: 1.capsys -> The capsys builtin fixture provides two bits of functionality: it allows you to retrieve stdout and stderr from some code, and it disables output capture temporarily. Let’s take a look at retrieving stdout and stderr. 2. Marker -> can be specified in the pytest.ini   to mark the test. 3. Incremental -> if it fails we can asks remaining all to fail by calling pytest.mark.incremental decorator 4. Python Fixtures -> Autouse , scope=[function, module, session etc] for dependency injection. 5. Python Fixtures can take parameters as well. 6. Pytest to parallelise the test cases using pytest-xdist   like due to multiple platforms , multiple nodes etc 7. Fixtures in Fixtures can be used like DB table existence checking can use another fixture for DB connection. 8. Within the fixture we can skip/fail TCs if that Fixture function has abnormal o/p behaviour. 9. Assert -> Pytest test case validation. 10. Conftest.py ->

Continuous Integration/Continuous delivery components setup

Setting up gitlab, Jenkins and Pycharm IDE on Ubuntu 14.04LTS[working links] Gitlab: https://linode.com/docs/development/version-control/install-gitlab-on-ubuntu-14-04-trusty-tahr/ Jenkins: https://vexxhost.com/resources/tutorials/how-to-install-configure-and-use-jenkins-on-ubuntu-14-04/ Pycharm IDE: https://www.jetbrains.com/pycharm/download/download-thanks.html?platform=mac&code=PCC

Distributed/Cluster nodes performance monitoring Live using metricbeat

Image
Live performance monitoring is the critical part of the big data ecosystem. There are many ways to monitor around it. However, i feel the metricbeat is one of the best choices. Metricbeat always runs in conjunction with EK(Elastic search and Kibana) stack. It's really special for the platform components (Kubernetes, Docker, fluentd, hyperkube, etcd, HA proxy etc  ), where we have inbuilt monitoring support in Metricbeat.  Metricbeat collects other system statistics such as CPU, Memory, IO, network stats etc and pushed them into elasticsearch. From Elasticsearch we can visualise them using Kibana. In my experiment, i set up the Elastic-search and Kibana as isolated system( on 192.168.1.2) and Metricbeat on all the cluster nodes which require performance statistics monitoring.   Usage Download the tar from the link here to all the cluster nodes: https://github.com/Indu-sharma/Utilities/blob/master/metricbeat.tar.gz Extract the tar in a folder: mkdir /data/metr