What is WORD COUNT:

Word Count  reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occurred, separated by a tab.

PYSPARK:

PySpark is the python binding for the Spark Platform and API and not much different from the Java/Scala versions. Python is dynamically typed, so RDDs can hold objects of multiple types. PySpark does not yet support a few API calls, such as lookup and non-text input files, though these will be added in future releases.

SPARK:

Spark is an open source big data processing framework built around speed, ease of use, and sophisticated analytics. Spark can be 100x faster than Hadoop for large scale data processing by exploiting in memory computing and other optimizations. It has easy-to-use APIs for operating on large datasets. It comes packaged with higher-level libraries, including support for SQL queries, streaming data, machine learning and graph processing.

PYTHON:

Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.

Create an Spark Application using Python and read a file and count number of times words will occur the file and also ignore all empty lines.

Step-1: Enter into PySpark

( Open a terminal and type a command )

pyspark

pysk-0

Step-2: Create an Sprk Application

( First we import the SparkContext and SparkConf into pyspark )

from pyspark import SparkContext, SparkConf

Step-3: Create Configuration object and set App name

conf = SparkConf().setAppName(“Pyspark Pgm”)

sc = SparkContext(conf = conf)

pysk-1

Step-4: Load data from HDFS

(i). First Create a text file and load the file into HDFS

Here is the Example File:

Save the following into PySpark.txt

PySpark is the python binding for the Spark Platform and API and not much different from the Java/Scala versions. A good starting point is the official page i.e Examples | Apache Spark. Python is dynamically typed, so RDDs can hold objects of multiple types. PySpark does not yet support a few API calls, such as lookup and non-text input files, though these will be added in future releases.

Load the file into HDFS

hdfs dfs -put /home/geouser/Documents/PySpark.txt /user/geouser/

pysk-01

(ii). Next load the PySpark.txt file (from HDFS) to pyspark

contentRDD =sc.textFile(“hdfs://localhost:9000/user/geouser/PySpark.txt”)

pysk-2

Step-5: Filter out non-empty lines from the loaded file (PySpark.txt)

nonempty_lines = contentRDD.filter(lambda x: len(x) > 0)

Step-6: Split the content based on space

words = nonempty_lines.flatMap(lambda x: x.split(‘ ‘))

mongo5

Step-7: Count the words

wordcount = words.map(lambda x:(x,1)) \

.reduceByKey(lambda x,y: x+y) \

.map(lambda x: (x[1], x[0])).sortByKey(False)

py_w-1

Step-8: View the file after filter

for word in wordcount.collect():

     print(word) (Give 4 spaces before the print statement)

py_w-2

py_w-3

Step-9: Save the Final Data to HDFS

wordcount.saveAsTextFile(“hdfs://localhost:9000/user/geouser/Wordcount”)

py_w-5

Description about Commands:

from pyspark import SparkContext, SparkConf

   –> The first step of any Spark driver application is to create a SparkContext.

The SparkContext allows your Spark driver application to access the cluster through a resource manager. The resource manager can be YARN, or Spark’s cluster manager.

In order to create a SparkContext you should first create a SparkConf. The SparkConf stores configuration parameters that your Spark driver application will pass to SparkContext.

conf = SparkConf().setAppName(“Pyspark Pgm”)

> setAppName() gives your Spark driver application a name so you can identify it in the Spark or Yarn UI.

sc = SparkContext(conf = conf)

-> First time configure we use this, suppose if the conf already exist we use

val sc = new SparkContext(conf)

contentRDD =sc.textFile(“hdfs://localhost:9000/user/geouser/PySpark.txt”)

contentRDD =sc.textFile() -> keyword for load a file

“hdfs://localhost:9000/user/geouser/PySpark.txt” –> Path of the HDFS file, which   

should be loaded.

nonempty_lines = contentRDD.filter(lambda x: len(x) > 0)

> Used for omitting empty lines

words = nonempty_lines.flatMap(lambda x: x.split(‘ ‘))

-> Used For Split the Content based on space (‘ ‘)

wordcount = words.map(lambda x:(x,1)) .reduceByKey(lambda x,y: x+y) .map(lambda x: (x[1], x[0])).sortByKey(False)

-> Perform a Map and Reduce Task

for word in wordcount.collect():

-> Used for Collect the Word

print(word)

-> Print the Filtered Words

wordcount.saveAsTextFile(“hdfs://localhost:9000/user/geouser/Wordcount”)

saveAsTextFile() -> Save the Filtered Output

“hdfs://localhost:9000/user/geouser/Wordcount” -> Path of the HDFS file, where should be

   loaded.