Apache Spark with Python (4) — 實作篇2

Jimmy Huang
8 min readAug 20, 2020

--

延續上一篇,繼續從電影資料集中應用RDD去做一些資料處理
RDD算是Spark中比較"low level"的操作單元,到後面會發現DataFrame(high-level)更好用,不過RDD算是Spark的基礎概念,好好弄懂是必要的
最基礎的想法就是,RDD就是Key-Value,我要如何在上面,運用map reduce,去進行transformation

RDD-transformation: MAP/REDUCE

map就是list element-wise對全部進行操作,reduce就是最後給你一個結果
而進行完transformation,必須要call RDD.collect(),因為RDD是lazy evaluation的,你不call collect(執行action),前面的transformation根本連坐都不會做
還有另外一個概念是 RDD會被重複使用時,就必須call RDD.cache()
不然下次再用到這個RDD,又要從頭再轉換一次

user id | item id | rating | timestamp

以下用程式碼逐行講解,我會用中文註解的註在每一行後
資料格式:(電影評分資料集)

from pyspark import SparkConf, SparkContext

def loadMovieNames():
# 將movie id與name用dictionary做一個map
movieNames = {}
with open("ml-100k/u.ITEM") as f:
for line in f:
fields = line.split('|')
movieNames[int(fields[0])] = fields[1]
return movieNames

conf = SparkConf().setMaster("local").setAppName("PopularMovies")
# 設定在本機運算就好,後面會講到在cluster上做運算
sc = SparkContext(conf = conf)

nameDict = sc.broadcast(loadMovieNames())
#broadcast的用意,是讓所有cluster都可以看到

lines = sc.textFile("file:///SparkCourse/ml-100k/u.data")
# user id | item id | rating | timestamp
# 196 242 3 881250949
# 186 302 3 891717742
# 今天的目的是要找出,被評分最多次的電影
movies = lines.map(lambda x: (int(x.split()[1]), 1))
# 經過MAP後,RDD會變成
# key:value, key是電影id:value都是設為1
# 242: 1
# 302: 1
movieCounts = movies.reduceByKey(lambda x, y: x + y)
# reduceByKey 就是統計整個RDD裡面 一個電影id出現幾次
# 242:120
# 302:88
# 可能會變成這樣,就是242這電影出現120次評分,302出現88次
# 接下來要想著依照 "出現次數" 去 "排序"
# 但是spark只能依照key去排序,不是value,此時怎麼辦呢? 就要flip

flipped = movieCounts.map( lambda x : (x[1], x[0]))
# 120: 242
# 88 : 302

sortedMovies = flipped.sortByKey()
# 按照出現次數去排序

sortedMoviesWithNames = sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))
# 排序好後 因為電影id實在太難懂了,所以用map的方式,用預先broadcast的dic,進行id與名稱轉換

results = sortedMoviesWithNames.collect()

for result in results:
print (result)

示範一下如何用high level的api(DataFrame)去做一樣的處理

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

def loadMovieNames():
movieNames = {}
with open("ml-100k/u.ITEM") as f:
for line in f:
fields = line.split('|')
movieNames[int(fields[0])] = fields[1]
return movieNames

# Create a SparkSession (the config bit is only for Windows!)
# 解決一些windows下特有的spark bug
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("PopularMovies").getOrCreate()

# Load up our movie ID -> name dictionary
nameDict = loadMovieNames()

# Get the raw data
lines = spark.sparkContext.textFile("file:///SparkCourse/ml-100k/u.data")

# Convert it to a RDD of Row objects
# 這邊還是RDD 不是DataFrame
movies = lines.map(lambda x: Row(movieID =int(x.split()[1])))
# Convert that to a DataFrame
movieDataset = spark.createDataFrame(movies)
# |movieID
# +-------
# | 50
# | 258
# | 100
# 此時的DataFrame是只有一個columns的
# 變成Dataframe之後,就跟PANDAS的API很像


# Some SQL-style magic to sort all movies by popularity in one line!
topMovieIDs = movieDataset.groupBy("movieID").count().orderBy("count", ascending=False).cache()
# 這邊的cache代表接下來的topMovieIDs可以被多次使用!!
# Show the results at this point:

#|movieID|count|
#+-------+-----+
#| 50| 584|
#| 258| 509|
#| 100| 508|

topMovieIDs.show()

# Grab the top 10
top10 = topMovieIDs.take(10)

# Print the results
print("\n")
for result in top10:
# Each row has movieID, count as above.
print("%s: %d" % (nameDict[result[0]], result[1]))

# Stop the session
spark.stop()

實際處理上,Dataframe會更常用到,也是未來在spark3.0後主要支援的對象,dataframe本身和pandas的api也蠻像的,應該很好上手
再整理一下Dataframe的轉換步驟
1. 資料讀進來可能是RDD,這時候要用map去返回Row()的物件
2. 用createDataFrame將RDD轉換成df

補充Spark SQL是將df轉換成一個view,可以用sql語法去查詢
df.createOrReplaceTempView(“people”)
teenagers = spark.sql(“SELECT * FROM people WHERE age >= 13 AND age <= 19”)

大概是這樣,下一節會講如何尋找電影的相似度,也就是推薦系統的做法,並運用AWS的服務(EMR,S3)去進行運算

系列文:
Apache Spark with Python (1) — 安裝篇
Apache Spark with Python (2) — 概念篇
Apache Spark with Python (3) — 實作篇
Apache Spark with Python (4) — 電影資料集處理
Apache Spark with Python(5) — Collaborative filtering 電影資料集推薦系統實戰

--

--