
PySpark_Recipes
In [1]:
# export IPYTHON_OPTS="notebook"
# export XDG_RUNTIME_DIR=""
# pyspark
# http://localhost:8888/
In [2]:
pythonList = [2.3,3.4,4.3,2.4,2.3,4.0]
pythonList
Out[2]:
In [3]:
# Using the collect() function is not recommended in production;
# rather, it should be used only in code debugging.
# distributed our data in two partitions.
parPythonData = sc.parallelize(pythonList, 2) # number of distributed chunks of data you want:
parPythonData.collect()
Out[3]:
In [4]:
parPythonData.first()
Out[4]:
In [5]:
parPythonData.take(2)
Out[5]:
In [6]:
parPythonData.getNumPartitions()
Out[6]:
In [7]:
tempData = [59,57.2,53.6,55.4,51.8,53.6,55.4]
parTempData = sc.parallelize(tempData,2)
parTempData.collect()
Out[7]:
In [8]:
# Converting Temperature from Fahrenheit to Celsius
def fahrenheitToCentigrade(temperature):
centigrade = (temperature - 32)*5/9
return centigrade
In [9]:
fahrenheitToCentigrade(59)
Out[9]:
In [10]:
parCentigradeData = parTempData.map(fahrenheitToCentigrade)
parCentigradeData.collect()
Out[10]:
In [11]:
# Filtering Temperatures Greater than 13o C
def tempMoreThanThirteen(temperature):
return temperature >= 13
In [12]:
filteredTemprature = parCentigradeData.filter(tempMoreThanThirteen)
filteredTemprature.collect()
Out[12]:
In [13]:
# Alternative
filteredTemprature = parCentigradeData.filter(lambda x : x >= 13)
filteredTemprature.collect()
Out[13]:
Perform Basic Data Manipulation
In [14]:
studentMarksData = [["si1","year1",62.08,62.4],
["si1","year2",75.94,76.75],
["si2","year1",68.26,72.95],
["si2","year2",85.49,75.8],
["si3","year1",75.08,79.84],
["si3","year2",54.98,87.72],
["si4","year1",50.03,66.85],
["si4","year2",71.26,69.77],
["si5","year1",52.74,76.27],
["si5","year2",50.39,68.58],
["si6","year1",74.86,60.8],
["si6","year2",58.29,62.38],
["si7","year1",63.95,74.51],
["si7","year2",66.69,56.92]]
In [15]:
studentMarksDataRDD = sc.parallelize(studentMarksData, 4)
studentMarksDataRDD.take(2)
Out[15]:
In [16]:
# Calculating Average Semester Grades
studentMarksMean = studentMarksDataRDD.map(lambda x : [x[0], x[1], (x[2] + x[3]) / 2])
studentMarksMean.take(2)
Out[16]:
In [17]:
# Filtering Student Average Grades in the Second Year
secondYearMarks = studentMarksMean.filter(lambda x : "year2" in x)
secondYearMarks.take(2)
Out[17]:
In [21]:
# Finding the Top Three Students
sortedMarksData = secondYearMarks.sortBy(keyfunc = lambda x : -x[2])
sortedMarksData.collect()
Out[21]:
In [24]:
sortedMarksData.take(3)
Out[24]:
In [25]:
# optimize using takeOrdered()
topThreeStudents = secondYearMarks.takeOrdered(num=3, key=lambda x : -x[2])
topThreeStudents
Out[25]:
In [26]:
# Finding the Bottom Three Students
bottomThreeStudents = secondYearMarks.takeOrdered(num=3, key=lambda x : x[2])
bottomThreeStudents
Out[26]:
In [27]:
# Getting All Students with 80% Averages
moreThan80Marks = secondYearMarks.filter(lambda x : x[2] > 80)
moreThan80Marks.collect()
Out[27]:
In [28]:
# Run Set Operations
data2001 = ['RIN1', 'RIN2', 'RIN3', 'RIN4', 'RIN5', 'RIN6', 'RIN7']
data2002 = ['RIN3', 'RIN4', 'RIN7', 'RIN8', 'RIN9']
data2003 = ['RIN4', 'RIN8', 'RIN10', 'RIN11', 'RIN12']
# Parallelizing
parData2001 = sc.parallelize(data2001,2)
parData2002 = sc.parallelize(data2002,2)
parData2003 = sc.parallelize(data2003,2)
In [29]:
# Finding Projects Initiated in Three Years
unionOf20012002 = parData2001.union(parData2002)
unionOf20012002.collect()
Out[29]:
In [30]:
allResearchs = unionOf20012002.union(parData2003)
allResearchs.collect()
Out[30]:
In [31]:
# Making Sets of Distinct Data
allUniqueResearchs = allResearchs.distinct()
allUniqueResearchs.collect()
Out[31]:
In [32]:
# Counting Distinct Elements
allUniqueResearchs.distinct().count()
Out[32]:
In [33]:
# We can run telescopic commands in PySpark too
parData2001.union(parData2002).union(parData2003).distinct().count()
Out[33]:
In [34]:
# Finding Projects Completed the First Year
firstYearCompletion = parData2001.subtract(parData2002)
firstYearCompletion.collect()
Out[34]:
In [35]:
# Finding Projects Completed in the First Two Years
unionTwoYears = parData2001.union(parData2002)
unionTwoYears.subtract(parData2003).collect()
Out[35]:
In [36]:
unionTwoYears.subtract(parData2003).distinct().collect()
Out[36]:
In [37]:
# Finding Projects Started in 2001 and Continued Through 2003.
projectsInTwoYear = parData2001.intersection(parData2002)
projectsInTwoYear.collect()
Out[37]:
In [38]:
projectsInTwoYear.subtract(parData2003).distinct().collect()
Out[38]:
In [39]:
# Calculate Summary Statistics
airVelocityKMPH = [12,13,15,12,11,12,11]
parVelocityKMPH = sc.parallelize(airVelocityKMPH, 2)
In [40]:
# Getting the Number of Data Points
countValue = parVelocityKMPH.count()
countValue
Out[40]:
In [41]:
# Summing Air Velocities in a Day
sumValue = parVelocityKMPH.sum()
sumValue
Out[41]:
In [42]:
# Finding the Mean Air Velocity
meanValue = parVelocityKMPH.mean()
meanValue
Out[42]:
In [43]:
# Finding the Variance of Air Data
varianceValue = parVelocityKMPH.variance()
varianceValue
Out[43]:
In [44]:
# Calculating Sample Variance
sampleVarianceValue = parVelocityKMPH.sampleVariance()
sampleVarianceValue
Out[44]:
In [45]:
# Calculating Standard Deviation
stdevValue = parVelocityKMPH.stdev()
stdevValue
Out[45]:
In [46]:
# Calculating Sample Standard Deviation
sampleStdevValue = parVelocityKMPH.sampleStdev()
sampleStdevValue
Out[46]:
In [47]:
# Calculating All Values in One Step using: stats()
parVelocityKMPH.stats()
Out[47]:
In [48]:
# transformed into a dictionary by using the asDict() function:
parVelocityKMPH.stats().asDict()
Out[48]:
In [49]:
# also can get individual elements by using different functions defined on StatCounter
parVelocityKMPH.stats().mean()
Out[49]:
In [50]:
parVelocityKMPH.stats().stdev()
Out[50]:
In [51]:
parVelocityKMPH.stats().count()
Out[51]:
In [52]:
parVelocityKMPH.stats().min()
Out[52]:
In [53]:
parVelocityKMPH.stats().max()
Out[53]:
In [54]:
pythonList = ['b' , 'd', 'm', 't', 'e', 'u']
In [55]:
RDD1 = sc.parallelize(pythonList, 2)
RDD1.collect()
Out[55]:
In [66]:
def vowelCheckFunction(data):
if data in ("a", "e", "i", "o", "u"):
return 1
else:
return 0
In [67]:
vowelCheckFunction('a')
Out[67]:
In [68]:
vowelCheckFunction('b')
Out[68]:
In [97]:
RDD2 = RDD1.map( lambda data : (data, vowelCheckFunction(data)))
RDD2.collect()
Out[97]:
In [103]:
# Fetching Keys from a Paired RDD
RDD2Keys = RDD2.keys()
RDD2Keys.collect()
Out[103]:
In [102]:
# Fetching Values from a Paired RDD
RDD2Values = RDD2.values()
RDD2Values.collect()
Out[102]:
In [105]:
# Aggregate Data
filDataSingle = [['filamentA','100W',605],
['filamentB','100W',683],
['filamentB','100W',691],
['filamentB','200W',561],
['filamentA','200W',530],
['filamentA','100W',619],
['filamentB','100W',686],
['filamentB','200W',600],
['filamentB','100W',696],
['filamentA','200W',579],
['filamentA','200W',520],
['filamentA','100W',622],
['filamentA','100W',668],
['filamentB','200W',569],
['filamentB','200W',555],
['filamentA','200W',541]]
In [107]:
filDataSingleRDD = sc.parallelize(filDataSingle,2)
filDataSingleRDD.take(3)
Out[107]:
In [167]:
#. Creating a Paired RDD
filDataPairedRDD1 = filDataSingleRDD.map(lambda x : (x[0], x[2]))
filDataPairedRDD1.take(4)
Out[167]:
In [174]:
# Finding the Mean Lifetime Based on Filament Type
filDataPairedRDD11 = filDataPairedRDD1.map(lambda x : (x[0], [x[1], 1]))
filDataPairedRDD11.take(4)
Out[174]:
In [175]:
filDataSumandCount = filDataPairedRDD11.reduceByKey(lambda l1,l2 :
[l1[0] + l2[0] ,l1[1]+l2[1]])
filDataSumandCount.collect()
Out[175]:
In [176]:
filDataPairedRDD11.count()
Out[176]:
In [177]:
filDataPairedRDD11.getNumPartitions()
Out[177]:
In [178]:
filDataPairedRDD11.take(5)
Out[178]:
In [179]:
filDataSumandCount.collect()
Out[179]:
In [180]:
filDataMeanandCount = filDataSumandCount.map( lambda l : [l[0],float(l[1][0])/l[1][1],l[1][1]])
filDataMeanandCount.collect()
Out[180]:
In [181]:
# Finding the Mean Lifetime Based on Bulb Power
filDataPairedRDD2 = filDataSingleRDD.map(lambda x : (x[1], x[2]))
filDataPairedRDD2.take(4)
Out[181]:
In [182]:
fillDataPairedRDD22 = filDataPairedRDD2.map(lambda x : (x[0], [x[1], 1]))
fillDataPairedRDD22.take(4)
Out[182]:
In [183]:
powerSumandCount = fillDataPairedRDD22.reduceByKey(lambda l1,l2 :
[l1[0]+l2[0], l1[1]+l2[1]])
powerSumandCount.collect()
Out[183]:
In [193]:
meanandCountPowerWise =powerSumandCount.map(lambda val :
[val[0],[float(val[1][0])/val[1][1],val[1][1]]])
meanandCountPowerWise.collect()
Out[193]:
In [196]:
# Finding the Mean Lifetime Based on Filament Type and Power
filDataSingleRDD.take(4)
Out[196]:
In [197]:
filDataComplexKeyData = filDataSingleRDD.map(lambda val : [(val[0], val[1], val[2])])
filDataComplexKeyData.take(4)
Out[197]:
In [200]:
filDataComplexKeyData = filDataSingleRDD.map( lambda val : [(val[0], val[1]),val[2]])
filDataComplexKeyData.take(4)
Out[200]:
In [201]:
filDataComplexKeyData1 = filDataComplexKeyData.map(lambda val : [val[0],[val[1],1]])
filDataComplexKeyData1.take(4)
Out[201]:
In [202]:
filDataComplexKeySumCount = filDataComplexKeyData1.reduceByKey(lambda l1, l2 : [l1[0]+l2[0], l1[1]+l2[1]])
filDataComplexKeySumCount.collect()
Out[202]:
In [204]:
filDataComplexKeyMeanCount = filDataComplexKeySumCount.map(lambda val : [val[0],[float(val[1][0])/val[1][1],val[1][1]]])
filDataComplexKeyMeanCount.collect()
Out[204]:
Join Data
In [205]:
studentData = [['si1','Robin','M'],
['si2','Maria','F'],
['si3','Julie','F'],
['si4','Bob', 'M'],
['si6','William','M']]
In [206]:
subjectsData = [['si1','Python'],
['si3','Java'],
['si1','Java'],
['si2','Python'],
['si3','Ruby'],
['si4','C++'],
['si5','C'],
['si4','Python'],
['si2','Java']]
In [207]:
# Creating a Paired RDD of Students and Subjects
studentRDD = sc.parallelize(studentData, 2)
studentRDD.take(4)
Out[207]:
In [210]:
studentPairedRDD = studentRDD.map(lambda val : (val[0],[val[1],val[2]]))
studentPairedRDD.take(4)
Out[210]:
In [211]:
subjectsPairedRDD = sc.parallelize(subjectsData, 2)
subjectsPairedRDD.take(4)
Out[211]:
In [213]:
# Performing an Inner Join
studenSubjectsInnerJoin = studentPairedRDD.join(subjectsPairedRDD)
studenSubjectsInnerJoin.collect()
Out[213]:
In [216]:
# Performing a Left Outer Join
studentSubjectsleftOuterJoin = studentPairedRDD.leftOuterJoin(subjectsPairedRDD)
studentSubjectsleftOuterJoin.collect()
Out[216]:
In [217]:
# Performing a Right Outer Join
studentSubjectsrightOuterJoin = studentPairedRDD.rightOuterJoin(subjectsPairedRDD)
studentSubjectsrightOuterJoin.collect()
Out[217]:
In [218]:
# Performing a Full Outer Join
studentSubjectsfullOuterJoin = studentPairedRDD.fullOuterJoin(subjectsPairedRDD)
studentSubjectsfullOuterJoin.collect()
Out[218]:
In [219]:
# Calculate Page Rank
pageLinks = [['a' ,['b','c','d']],
['c', ['b']],
['b', ['d','c']],
['d', ['a','c']]]
pageRanks = [['a',1],
['c',1],
['b',1],
['d',1]]
In [220]:
# Function to Calculate Contributions
def rankContributions(uris, rank):
numberOfUris = len(uris)
rankContribution = float(rank) / numberofUris
newrank = []
for uri in uris:
newrank.append(uri, rankContribution)
return newrank
In [221]:
# Creating Paired RDDs
pageLinksRDD = sc.parallelize(pageLinks, 2)
pageLinksRDD.collect()
Out[221]:
In [222]:
pageRanksRDD = sc.parallelize(pageRanks, 2)
pageRanksRDD.collect()
Out[222]:
In [238]:
numIter = 20
s = 0.85
for i in range(numIter):
linksRank = pageLinksRDD.join(pageRanksRDD)
contributedRDD = linksRank.flatMap(lambda x : rankContribution(x[1][0],x[1][1]))
sumRanks = contributedRDD.reduceByKey(lambda v1,v2 : v1+v2)
pageRanksRDD = sumRanks.map(lambda x : (x[0],(1-s)+s*x[1]))
In [241]:
#pageRanksRDD.collect()
I/O in PySpark
In [248]:
# Reading a Text File by Using the textFile() Function
playData = sc.textFile('C:/Users/KEVIN/Downloads/WorkArea/Python/dataFiles/shakespearePlays.txt',2)
playDataList = playData.collect()
In [249]:
type(playDataList)
Out[249]:
In [252]:
playDataList[0:4]
Out[252]:
In [268]:
# Reading a Text File by Using wholeTextFiles()
playData = sc.wholeTextFiles('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/shakespearePlays.txt',2)
playData.keys().collect()
Out[268]:
In [269]:
# fetch the content of the file:
playData.values().collect()
Out[269]:
In [270]:
# Counting the Number of Lines in a File
playData.count() # not 4 as shown in book
Out[270]:
In [271]:
# Counting the Number of Characters on Each Line
pythonString = "My python"
len(pythonString)
Out[271]:
In [272]:
playDataLineLength = playData.map(lambda x : len(x))
playDataLineLength.collect()
#output: [21, 25, 22, 14]
Out[272]:
In [273]:
totalNumberOfCharacters = playDataLineLength.sum()
totalNumberOfCharacters
#output: 82
Out[273]:
In [274]:
# Write an RDD to a Simple Text File
# Counting the Number of Characters on Each Line
playData = sc.textFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/shakespearePlays.txt',4)
playDataLineLength = playData.map(lambda x : len(x))
playDataLineLength.collect()
Out[274]:
In [275]:
# Saving the RDD to a File
playDataLineLength.saveAsTextFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/savedData')
In [ ]:
# C:\Users\KEVIN\Downloads\WorkArea\Python\pyspark\pyspark-recipes-master\code_mishra\chapter6\dataFiles\savedData
# cat part-00000
In [280]:
# Read a Directory
manyFilePlayData = sc.textFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/manyFiles',4)
manyFilePlayData.collect()
Out[280]:
In [276]:
# Read a Directory
manyFilePlayData = sc.textFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/savedData',4)
manyFilePlayData.collect()
Out[276]:
In [281]:
# Reading a Directory by Using wholeTextFiles()
manyFilePlayDataKeyValue = sc.wholeTextFiles('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/manyFiles',4)
manyFilePlayDataKeyValue.collect()
Out[281]:
In [ ]:
# Read Data from HDFS
# hdfs://localhost:9746 filamentData.csv
filamentdata = sc.textFile('hdfs://localhost:9746/bookData/filamentData.csv',4)
filamentdata.take(4)
In [ ]:
# Save RDD Data to HDFS
playData = sc.textFile('/home/muser/bData/shakespearePlays.txt',4)
playDataLineLength = playData.map(lambda x : len(x))
playDataLineLength.collect()
# output: [21, 25, 22, 14]
In [ ]:
# Saving an RDD to HDFS
# Each file has a single data point because our RDD has four partitions.
playDataLineLength.saveAsTextFile('hdfs://localhost:9746/savedData/')
# hadoop fs -cat /savedData/part-00000
# hadoop fs -cat /savedData/part-00001
# hadoop fs -cat /savedData/part-00002
# hadoop fs -cat /savedData/part-00003
In [ ]:
# Read Data from a Sequential File
simpleRDD = sc.sequenceFile('hdfs://localhost:9746/sequenceFileToRead')
simpleRDD.collect()
In [282]:
# Write Data to a Sequential File
# Creating a Paired RDD
subjectsData = [('si1','Python'),
('si3','Java'),
('si1','Java'),
('si2','Python'),
('si3','Ruby'),
('si4','C++'),
('si5','C'),
('si4','Python'),
('si2','Java')]
In [ ]:
subjectsPairedRDD = sc.parallelize(subjectsData, 4)
subjectsPairedRDD.take(4)
In [ ]:
# Saving the RDD as a Sequence File
subjectsPairedRDD.saveAsSequenceFile('hdfs://localhost:9746/sequenceFiles')
# hadoop fs -ls /sequenceFiles
# the files have been saved in four parts
In [336]:
# Read a CSV File
# Writing a Python Function to Parse CSV Lines
import csv
import StringIO
def parseCSV(csvRow) :
data = StringIO.StringIO(csvRow)
dataReader = csv.reader(data, lineterminator = '')
return(dataReader.next())
In [337]:
csvRow = "p,s,r,p"
parseCSV(csvRow)
Out[337]:
In [338]:
# Read csv file and Creating a Paired RDD
filamentRDD = sc.textFile('file:///Users/KEVIN/Downloads/WorkArea/Python/pyspark/datafiles/filamentDataList.csv',4 )
filamentRDDCSV = filamentRDD.map(parseCSV)
filamentRDDCSV.take(4)
Out[338]:
In [327]:
# Write an RDD to a CSV File
# Creating a Function to Convert a List into a String
import csv
import StringIO
def createCSV(dataList):
data = StringIO.StringIO()
dataWriter = csv.writer(data, lineterminator = '')
dataWriter.writerow(dataList)
return (data.getvalue())
In [328]:
listData = ['p', 'q', 'r', 's']
createCSV(listData)
Out[328]:
In [339]:
# Saving Data to a File
simpleData = [['p',20],
['q',30],
['r',20],
['m',25]]
In [340]:
simpleRDD = sc.parallelize(simpleData, 4)
simpleRDD.take(4)
Out[340]:
In [344]:
simpleRDDLines = simpleRDD.map(createCSV)
simpleRDDLines.take(4)
simpleRDDLines.saveAsTextFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/csvData/')
In [348]:
# Read a JSON File
# Creating a Function to Parse JSON Data
import json
def jsonParse(dataLine):
parsedDict = json.loads(dataLine)
valueData = parsedDict.values()
return(valueData)
In [349]:
jsonData = '{"Time":"6AM", "Temperature":15}'
jsonParsedData = jsonParse(jsonData)
print jsonParsedData
In [350]:
# Reading the File
tempData = sc.textFile("file:///Users/KEVIN/Downloads/WorkArea/Python/pyspark/datafiles/tempData.json",4)
tempData.take(4)
Out[350]:
In [351]:
# Creating a Paired RDD
tempDataParsed = tempData.map(jsonParse)
tempDataParsed.take(4)
Out[351]:
In [352]:
# Write an RDD to a JSON File
# Creating a Function That Takes a List and Returns a JSON String
def createJSON(data):
dataDict = {}
dataDict['Name'] = data[0]
dataDict['Age'] = data[1]
return(json.dumps(dataDict))
In [353]:
nameAgeList = ['Arun', 22]
createJSON(nameAgeList)
Out[353]:
In [354]:
# Saving Data in JSON Format
nameAgeData = [['Arun',22],
['Bony',35],
['Juna',29]]
In [357]:
nameAgeRDD = sc.parallelize(nameAgeData, 3)
nameAgeRDD.collect()
Out[357]:
In [358]:
nameAgeJSON = nameAgeRDD.map(createJSON)
nameAgeJSON.collect()
Out[358]:
In [359]:
nameAgeJSON.saveAsTextFile('file:///Users/KEVIN/Downloads/WorkArea/Python/pyspark/datafiles/jsonDir/')
In [ ]: