Apache Spark with Python (3) - 實作篇1

Jimmy Huang
5 min readAug 15, 2020

--

看完第二篇後,這篇算是直接實作,相信大家還是對RDD是什麼很不明白,直接實作

(請先安裝好環境... Spark環境可能有點折騰)
首先下載數據集: 這是國外grouplens提供電影評分的數據集
https://grouplens.org/datasets/movielens/100k/

資料格式(共100k) user id | item id | rating | timestamp.

資料夾底下應該要有
1. ratings-counter.py(如下面區塊)
2. ml-100k(folder)

from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext(conf = conf) # 設定run在local端 不是cluster
lines = sc.textFile("ml-100k/u.data")
#PATH設在RUN的相對下比較不會出錯,lines就是大名鼎鼎的RDD
ratings = lines.map(lambda x: x.split()[2])
# 將RDD切為只要RATING那一欄位,這個步驟是"Transformation"
result = ratings.countByValue()
# SPARK函式: 統計RDD中各個Value出現的次數 "有點像是reduce"
# 後面這邊就是純python的語法了
# result = defaultdict(<class 'int'>, {'3': 27145, '1': 6110, '2': 11370, '4': 34174, '5': 21201})

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
print("%s %i" % (key, value))

最後 spark-submit ratings-counter.py即可

接下來講解第二篇範例,有大量的社交網路資料,ID|NAME|AGE|FRIENDS
今天想要得到的結果是,平均年齡的朋友量

----------------
0,Will,33,385
1,Jean,33,2
2.Hugh,55,221
--------------

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

def parseLine(line):
fields = line.split(',')
age = int(fields[2])
numFriends = int(fields[3])
return (age, numFriends)

lines = sc.textFile("file:/fakefriends.csv")
rdd = lines.map(parseLine)
#Output here:
33, 385
33, 2
55, 221
40, 465
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))# mapValues 保持key不變
(33, 385) -->(33, (385,1) )
(33,2 ) -->(33, (22,1) )
(55,221 ) -->(55, (221,1) )
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])# (33,(387,2))--> (33,193.5)results = averagesByAge.collect()
for result in results:
print(result)

程式碼的架構: 先載入RDD,然後再用transformation的方式得到新的RDD,但是注意,而mapValues常常用在你不會動到key值的時候
transformation又分成map and reduce

注意: 在call action前 這些RDD都不會進行轉換,這邊的actions就是.collect()

系列文:
Apache Spark with Python (1) — 安裝篇
Apache Spark with Python (2) — 概念篇
Apache Spark with Python (3) — 實作篇
Apache Spark with Python (4) — 電影資料集處理
Apache Spark with Python(5) — Collaborative filtering 電影資料集推薦系統實戰

--

--