Apache Spark with Python(5) - Collaborative filtering 電影資料集推薦系統實戰

Jimmy Huang
10 min readAug 21, 2020

--

本篇為Udemy MOOC的capstone, Link is here

今天要講的是協同過濾Collaborative filtering,簡單來說就是一種推薦系統(可參考這篇),在上一篇中有用到movie rating的data set做資料分析,這篇會著重在,如何在AWS EMR上,進行雲端的大數據運算,來推薦電影

協同過濾這方法用最直白的方法來解釋的話,其實就是「喜歡這電影的人也喜歡哪幾部電影」,先歸納出的使用者行為模式,並找出與你相似的使用者依此推薦內容。

協同過濾可以參考這篇(講得很詳細),我舉個白話的例子,我喜歡蜘蛛人也喜歡美國隊長,這樣演算法就可以推薦給喜歡蜘蛛人的人去看美國隊長,基本上的演算法我這邊不深入探究,主要是採用ITEM-ITEM Similarity Matrix,而算法是Cosine Similarity

資料來源: https://ithelp.ithome.com.tw/articles/10219511

不過我們這邊採用的是item-based-collaborative-filtering,大致原理都相同,說穿了相似度就是歐氏距離的倒數
https://medium.com/x8-the-ai-community/recommendation-system-db51c868f13d

有了算法後,要如何實現這個算法呢? 就要借助AWS EMR,

Amazon EMR 是透過 AWS 雲端虛擬機器集群的分散式運算服務,可用於分析和處理海量資料。EMR 集群利用 Hadoop 的開源框架管理,使用戶可以專注於資料處理和分析,不用花時間煩惱 Hadoop 集群的配置、管理和優化,也無需擔心所需要的計算能力。

簡單來說,EMR基本上就內建好Spark和Hadoop了,要做的事情,就只是把spark的code放上EMR + 資料集放到S3 (當然在local端也是可以做,只是就是很慢...)

而在EMR上面運作,就一樣要提到partition,想像一下這是map reduce,也就是分散式運算的核心。(投票開票-->全台的投票所一起幫忙,再彙整票數),Spark不會自己幫你分partition,所以你要自己指定啥運作要分partition,通常是開銷很大的函式,Join,groupByKey,reduceByKey這種就會需要

Use .partitionBy() on an RDD before running a large operation that benefits from partitioning

至於.partitionBy()括號裡面要填數字,填多少呢?太少的話,根本沒有效用,太多的話,你會有很多時間再做shuffling,這個overhead就需要去tradeoff,partitionBy(100)通常是個合理的初測數字

另外spark-submit –executor-memory 1g MovieSimilarities1M.py 260
這邊多了一個引數 叫做–executor-memory,後面接記憶體,通常一些error可能是記憶體不夠,記得分配大點

補充一些API和函式說明

SPARK-JOIN
http://www.neilron.xyz/join-in-spark/

loadMovieNames() 產生movie id與name對應的字典

makePairs 產生同一個使用者下的Pair ("美國隊長","蜘蛛人"),(5,4)

filterDuplicates 在做join時,會重複,像是

("美國隊長","蜘蛛人"),(5,4) 與 ("蜘蛛人","美國隊長"),(4,5)

就會重複,這邊就取一個就好

computeCosineSimilarity:
計算重複性,這邊就是單純算法(歐式距離倒數)

函式如下:

def loadMovieNames():
movieNames = {}
with open("movies.dat") as f:
for line in f:
fields = line.split("::")
movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
return movieNames

def makePairs((user, ratings)):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return ((movie1, movie2), (rating1, rating2))

def filterDuplicates( (userID, ratings) ):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return movie1 < movie2

def computeCosineSimilarity(ratingPairs):
numPairs = 0
sum_xx = sum_yy = sum_xy = 0
for ratingX, ratingY in ratingPairs:
sum_xx += ratingX * ratingX
sum_yy += ratingY * ratingY
sum_xy += ratingX * ratingY
numPairs += 1

numerator = sum_xy
denominator = sqrt(sum_xx) * sqrt(sum_yy)

score = 0
if (denominator):
score = (numerator / (float(denominator)))

return (score, numPairs)

函式定義好後,開始轉變RDD!

conf = SparkConf()
sc = SparkContext(conf = conf)

print("\nLoading movie names...")
nameDict = loadMovieNames()

data = sc.textFile("s3n://{your-s3-domain}}/ml-1m/ratings.dat")

# 將資料轉成RDD Key=(user ID) Value (movie ID, rating)
ratings = data.map(lambda l: l.split("::")).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

# Emit every movie rated together by the same user.
# Self-join to find every combination.
ratingsPartitioned = ratings.partitionBy(100)
joinedRatings = ratingsPartitioned.join(ratingsPartitioned)

# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs).partitionBy(100)

# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()

到這邊為止,moviePairRatings的資料會是

(movie1, movie2) = > (rating1, rating2), (rating1, rating2)......

key就是兩個電影的名稱,後面接著一堆評分pair(一個一個user給的)
例如
A組(美國隊長, 鋼鐵人)--> (5,4),(3,2),(5,5),(2,2)
B組(鐵達尼號, 絕命終結站) -->(5,1),(1,5),(4,2)
這樣大家就能知道A組其實很相近,B組一點都不相近
然後開始進行相似度運算!!(算法: cosine, persist()跟cache()很像,不過會盡到disk裡面去儲存)

moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).persist()

# Save the results if desired
moviePairSimilarities.sortByKey()
moviePairSimilarities.saveAsTextFile("movie-sims")

後面這段code基本上就是在跑顯示資料,輸入的arg可以用來先選出一部電影,然後找出和他相似的電影,Threshold是用來設定一些門檻的(你不希望兩部只有一個人評分的電影(5,5),因為少數而干擾結果吧?)

if (len(sys.argv) > 1):

scoreThreshold = 0.97
coOccurenceThreshold = 1000

movieID = int(sys.argv[1])

# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = moviePairSimilarities.filter(lambda((pair,sim)): \
(pair[0] == movieID or pair[1] == movieID) \
and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)

# Sort by quality score.
results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)

print("Top 10 similar movies for " + nameDict[movieID])
for result in results:
(sim, pair) = result
# Display the similarity result that isn't the movie we're looking at
similarMovieID = pair[0]
if (similarMovieID == movieID):
similarMovieID = pair[1]
print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))

大概就是這樣,基本上aws emr的一些設定都可以保持預設
(Default run on a YARN cluster)
至於EMR是蠻貴的,所以記得運算完要關掉哦!

系列文:
Apache Spark with Python (1) — 安裝篇
Apache Spark with Python (2) — 概念篇
Apache Spark with Python (3) — 實作篇
Apache Spark with Python (4) — 電影資料集處理
Apache Spark with Python(5) — Collaborative filtering 電影資料集推薦系統實戰

--

--