In [4]:
filamentData = [['filamentA','100W',605],
['filamentB','100W',683],
['filamentB','100W',691],
['filamentB','200W',561],
['filamentA','200W',530],
['filamentA','100W',619],
['filamentB','100W',686],
['filamentB','200W',600],
['filamentB','100W',696],
['filamentA','200W',579],
['filamentA','200W',520],
['filamentA','100W',622],
['filamentA','100W',668],
['filamentB','200W',569],
['filamentB','200W',555],
['filamentA','200W',541]]
In [5]:
filamentDataRDD = sc.parallelize(filamentData, 4)
filamentDataRDD.take(4)
Out[5]:
In [6]:
# Creating a Schema of a DataFrame
from pyspark.sql.types import *
FilamentTypeColumn = StructField("FilamentType", StringType(), True)
BulbPowerColumn = StructField("BulbPower", StringType(), True)
LifeInHoursColumn = StructField("LifeInHours", StringType(), True)
FilamentDataFrameSchema = StructType([FilamentTypeColumn, BulbPowerColumn, LifeInHoursColumn])
FilamentDataFrameSchema
Out[6]:
In [7]:
# Creating an RDD of Row Objects
from pyspark.sql import Row
filamentRDDofRows = filamentDataRDD.map(lambda x : Row(str(x[0]), str(x[1]), str(x[2])) )
filamentRDDofRows.take(4)
Out[7]:
In [8]:
# Creating a DataFrame
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
filamentDataFrameRaw = sqlContext.createDataFrame(filamentRDDofRows, FilamentDataFrameSchema)
filamentDataFrameRaw.take(4)
Out[8]:
In [9]:
#Printing a Schema of a DataFrame
filamentDataFrameRaw.printSchema()
In [10]:
# Changing the Data Type of a Column
filamentDataFrame = filamentDataFrameRaw.withColumn('LifeInHours', filamentDataFrameRaw.LifeInHours.cast(FloatType()))
filamentDataFrame.printSchema()
In [11]:
filamentDataFrame.show(5)
In [154]:
dfdf = filamentDataFrame.describe()
dfdf.show()
In [12]:
filamentDataFrame.columns
Out[12]:
In [13]:
# Filtering Out Data Where BulbPower Is 100W
filamentDataFrame100Watt = filamentDataFrame.filter(filamentDataFrame.BulbPower == '100W')
filamentDataFrame100Watt.show()
In [14]:
# Selecting Data from a DataFrame
filamentData100WGreater650 = filamentDataFrame.filter((filamentDataFrame.BulbPower == '100W') & (filamentDataFrame.LifeInHours > 650.0))
filamentData100WGreater650.show()
In [142]:
# Perform Exploratory Data Analysis on a DataFrame
#Defining the DataFrame Schema
from pyspark.sql.types import *
FilamentTypeColumn = StructField("FilamentType", StringType(), True)
BulbPowerColumn = StructField("BulbPower", StringType(), True)
LifeInHoursColumn = StructField("LifeInHours", StringType(), True)
FilamentDataFrameSchema = StructType([FilamentTypeColumn, BulbPowerColumn, LifeInHoursColumn])
In [144]:
df = sqlContext.read.load('file:///Users//KEVIN//Downloads//WorkArea//Python//pyspark//datafiles//filamentDataList.csv',
format='com.databricks.spark.csv',
header='true',
schema = FilamentDataFrameSchema)
df.show(5)
In [151]:
filamentDataFrame = sqlContext.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","true").schema(FilamentDataFrameSchema).load('file:///Users//KEVIN//Downloads//WorkArea//Python//pyspark//datafiles//filamentDataList.csv')
filamentDataFrame.show(5)
In [136]:
filamentDataFrame.printSchema()
In [137]:
# Calculating Summary Statistics
dataSummary = filamentDataFrame.describe()
dataSummary.show()
In [138]:
df = sqlContext.read.load('file:///Users//KEVIN//Downloads//WorkArea//Python//pyspark//datafiles//filamentDataList.csv',
format='com.databricks.spark.csv',
header=True,
inferSchema=False,
schema = FilamentDataFrameSchema)
df.show(5)
In [139]:
df.count()
Out[139]:
In [140]:
df.printSchema()
In [141]:
df.describe().show()
In [124]:
df.dtypes
Out[124]:
In [125]:
df.describe().show()