基于Spark的协同过滤音乐推荐(python源码分析)

By | 2018年12月20日

因为要进行大数据场景下利用Spark做音乐推荐,所以记录一下学习Spark的过程和实验过程。

一 对Spark整体上的认识

Spark是一个分布式计算引擎,将大量的计算任务分发到计算结点,实现分布式计算,和hadoop不同的是,Spark计算的中间结果存储在内存中,所以同等配置机器情况下,Spark的运算速度比hadoop块很多倍。Spark的基本概念包括:

Application:用户编写的Spark程序

RDD:弹性分布式数据集,可以看成是数据在分布式内存上的表示

Job(作业):一个Job包含多个RDD及应用于RDD上的各种操作

Executor(执行者):负责执行各种Task(任务)的进程

Task(任务): 运行在Executor上的工作单元

Worker(工作节点):集群中的具体执行计算任务的结点

Driver(任务控制节点):Driver负责提交Job,转换为Task,在Executor进程间协调Task调度。

SparkContext: 用户程序和集群管理器交互的接口

如下图,任务控制结点(Driver)创建SparkContext,由SparkContext向集群管理器(Cluster Manager)申请计算资源,创建执行者进程(Executor),然后将任务(Task)被分配到2个计算结点上(Worker Node),各个计算节点还需要和存储系统HDFS,HIVE进行交互,读取数据或者写入数据。

图9-5-Spark运行架构

下图表明了Spark中各种概念之间的关系,一个任务控制节点可以创建多个作业,每个作业包含多个执行阶段,每个阶段包含多个任务。执行用户编写的Spark程序时,会创建一个任务控制节点,申请计算资源,分发程序代码和数据,各个计算节点的执行者开始分阶段执行不同的任务。

图9-6-Spark中各种概念之间的相互关系

Spark的具体运行过程如下图,任务控制节点创建了SparkContext对象,其负责创建RDD,构建DAG图(有向无环图,反映了RDD之间的关系),然后向集群管理器申请资源,集群管理器分配计算节点 ,计算结点向SparkContext申请任务,SparkContext将DAG图分解成任务集合,在将任务集合分配到不同的计算结点上去 。计算节点上的执行者进程执行任务,任务完成后通知SparkContext,写入数据,释放资源。

图9-7-Spark运行基本流程图

二 spark mllib recommendation

Spark的推荐算法只有基于矩阵分解的协同过滤算法,即将m个用户和n个物品组成的评分矩阵M分解成低维矩阵A和B的內积。其矩阵分解算法是FunkSVD。

Spark MLlib推荐算法python对应的接口都在pyspark.mllib.recommendation包中,这个包有三个类,Rating, MatrixFactorizationModel和ALS。

Rating类如下所示,代表用户的评分记录,即用户id,物品id,评分

class Rating(namedtuple("Rating", ["user", "product", "rating"])):
    """
    Represents a (user, product, rating) tuple.
    >>> r = Rating(1, 2, 5.0)
    >>> (r.user, r.product, r.rating)
    (1, 2, 5.0)
    >>> (r[0], r[1], r[2])
    (1, 2, 5.0)
    .. versionadded:: 1.2.0
    """

    def __reduce__(self):
        return Rating, (int(self.user), int(self.product), float(self.rating))

**ALS类中封装了使用交替最小二乘法,使用Rating类提供的元组进行模型训练的方法。**FunkSVD进行矩阵分解的目标优化函数使用的是ALS。ALS类中有train和trainImplicit两个方法,train方法是直接使用评分矩阵进行矩阵分解,而trainImplicit使用了隐式用户反馈数据来训练模型。本文只考虑使用评分矩阵进行矩阵分解。

train方法的参数有:

1) ratings : 输入评分矩阵对应的RDD。

2) rank : 矩阵分解时对应的低维的维数。维数越大,算法运行时间和空间需求越大,通常取10-200。

3) iterations :在矩阵分解用交替最小二乘法求解时,进行迭代的最大次数。这个值取决于评分矩阵的维度,以及评分矩阵的系数程度。一般来说,不需要太大,比如5-20次即可。默认值是5。

4) lambda: 在 python接口中使用的是lambda_,原因是lambda是Python的保留字。这个值即为FunkSVD分解时对应的正则化系数。主要用于控制模型的拟合程度,增强模型泛化能力。取值越大,则正则化惩罚越强。大型推荐系统一般需要调参得到合适的值。

class ALS(object):
    """Alternating Least Squares matrix factorization
    .. versionadded:: 0.9.0
    """
	....omitted...
    @classmethod
    @since("0.9.0")
    def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False,
              seed=None):
        """
        Train a matrix factorization model given an RDD of ratings by users
        for a subset of products. The ratings matrix is approximated as the
        product of two lower-rank matrices of a given rank (number of
        features). To solve for these features, ALS is run iteratively with
        a configurable level of parallelism.
        :param ratings:
          RDD of `Rating` or (userID, productID, rating) tuple.
        :param rank:
          Number of features to use (also referred to as the number of latent factors).
        :param iterations:
          Number of iterations of ALS.
          (default: 5)
        :param lambda_:
          Regularization parameter.
          (default: 0.01)
        :param blocks:
          Number of blocks used to parallelize the computation. A value
          of -1 will use an auto-configured number of blocks.
          (default: -1)
        :param nonnegative:
          A value of True will solve least-squares with nonnegativity
          constraints.
          (default: False)
        :param seed:
          Random seed for initial matrix factorization model. A value
          of None will use system time as the seed.
          (default: None)
        """
        model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations,
                              lambda_, blocks, nonnegative, seed)
        return MatrixFactorizationModel(model)

    @classmethod
    @since("0.9.0")
    def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01,
                      nonnegative=False, seed=None):
        """
        Train a matrix factorization model given an RDD of 'implicit
        preferences' of users for a subset of products. The ratings matrix
        is approximated as the product of two lower-rank matrices of a
        given rank (number of features). To solve for these features, ALS
        is run iteratively with a configurable level of parallelism.
        :param ratings:
          RDD of `Rating` or (userID, productID, rating) tuple.
        :param rank:
          Number of features to use (also referred to as the number of latent factors).
        :param iterations:
          Number of iterations of ALS.
          (default: 5)
        :param lambda_:
          Regularization parameter.
          (default: 0.01)
        :param blocks:
          Number of blocks used to parallelize the computation. A value
          of -1 will use an auto-configured number of blocks.
          (default: -1)
        :param alpha:
          A constant used in computing confidence.
          (default: 0.01)
        :param nonnegative:
          A value of True will solve least-squares with nonnegativity
          constraints.
          (default: False)
        :param seed:
          Random seed for initial matrix factorization model. A value
          of None will use system time as the seed.
          (default: None)
        """
        model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank,
                              iterations, lambda_, blocks, alpha, nonnegative, seed)
        return MatrixFactorizationModel(model)

MatrixFactorizationModel是使用ALS训练出的模型,模型可以作出预测。常用的预测有某一用户和某一物品对应的评分,某用户最喜欢的N个物品,某物品可能会被最喜欢的N个用户,所有用户各自最喜欢的N物品,以及所有物品被最喜欢的N个用户。

class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader):

    """A matrix factorisation model trained by regularized alternating
    least-squares.
    >>> r1 = (1, 1, 1.0)
    >>> r2 = (1, 2, 2.0)
    >>> r3 = (2, 1, 2.0)
    >>> ratings = sc.parallelize([r1, r2, r3])
    >>> model = ALS.trainImplicit(ratings, 1, seed=10)
    >>> model.predict(2, 2)
    0.4...
    >>> testset = sc.parallelize([(1, 2), (1, 1)])
    >>> model = ALS.train(ratings, 2, seed=0)
    >>> model.predictAll(testset).collect()
    [Rating(user=1, product=1, rating=1.0...), Rating(user=1, product=2, rating=1.9...)]
    >>> model = ALS.train(ratings, 4, seed=10)
    >>> model.userFeatures().collect()
    [(1, array('d', [...])), (2, array('d', [...]))]
    >>> model.recommendUsers(1, 2)
    [Rating(user=2, product=1, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
    >>> model.recommendProducts(1, 2)
    [Rating(user=1, product=2, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
    >>> model.rank
    4
    >>> first_user = model.userFeatures().take(1)[0]
    >>> latents = first_user[1]
    >>> len(latents)
    4
    >>> model.productFeatures().collect()
    [(1, array('d', [...])), (2, array('d', [...]))]
    >>> first_product = model.productFeatures().take(1)[0]
    >>> latents = first_product[1]
    >>> len(latents)
    4
    >>> products_for_users = model.recommendProductsForUsers(1).collect()
    >>> len(products_for_users)
    2
    >>> products_for_users[0]
    (1, (Rating(user=1, product=2, rating=...),))
    >>> users_for_products = model.recommendUsersForProducts(1).collect()
    >>> len(users_for_products)
    2
    >>> users_for_products[0]
    (1, (Rating(user=2, product=1, rating=...),))
    >>> model = ALS.train(ratings, 1, nonnegative=True, seed=10)
    >>> model.predict(2, 2)
    3.73...
    >>> df = sqlContext.createDataFrame([Rating(1, 1, 1.0), Rating(1, 2, 2.0), Rating(2, 1, 2.0)])
    >>> model = ALS.train(df, 1, nonnegative=True, seed=10)
    >>> model.predict(2, 2)
    3.73...
    >>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10)
    >>> model.predict(2, 2)
    0.4...
    >>> import os, tempfile
    >>> path = tempfile.mkdtemp()
    >>> model.save(sc, path)
    >>> sameModel = MatrixFactorizationModel.load(sc, path)
    >>> sameModel.predict(2, 2)
    0.4...
    >>> sameModel.predictAll(testset).collect()
    [Rating(...
    >>> from shutil import rmtree
    >>> try:
    ...     rmtree(path)
    ... except OSError:
    ...     pass
    .. versionadded:: 0.9.0
    """
    @since("0.9.0")
    def predict(self, user, product):
        """
        Predicts rating for the given user and product.
        """
        return self._java_model.predict(int(user), int(product))

    @since("0.9.0")
    def predictAll(self, user_product):
        """
        Returns a list of predicted ratings for input user and product
        pairs.
        """
        assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
        first = user_product.first()
        assert len(first) == 2, "user_product should be RDD of (user, product)"
        user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1])))
        return self.call("predict", user_product)

    @since("1.2.0")
    def userFeatures(self):
        """
        Returns a paired RDD, where the first element is the user and the
        second is an array of features corresponding to that user.
        """
        return self.call("getUserFeatures").mapValues(lambda v: array.array('d', v))

    @since("1.2.0")
    def productFeatures(self):
        """
        Returns a paired RDD, where the first element is the product and the
        second is an array of features corresponding to that product.
        """
        return self.call("getProductFeatures").mapValues(lambda v: array.array('d', v))

    @since("1.4.0")
    def recommendUsers(self, product, num):
        """
        Recommends the top "num" number of users for a given product and
        returns a list of Rating objects sorted by the predicted rating in
        descending order.
        """
        return list(self.call("recommendUsers", product, num))

    @since("1.4.0")
    def recommendProducts(self, user, num):
        """
        Recommends the top "num" number of products for a given user and
        returns a list of Rating objects sorted by the predicted rating in
        descending order.
        """
        return list(self.call("recommendProducts", user, num))

    def recommendProductsForUsers(self, num):
        """
        Recommends the top "num" number of products for all users. The
        number of recommendations returned per user may be less than "num".
        """
        return self.call("wrappedRecommendProductsForUsers", num)

    def recommendUsersForProducts(self, num):
        """
        Recommends the top "num" number of users for all products. The
        number of recommendations returned per product may be less than
        "num".
        """
        return self.call("wrappedRecommendUsersForProducts", num)

    @property
    @since("1.4.0")
    def rank(self):
        """Rank for the features in this model"""
        return self.call("rank")

    @classmethod
    @since("1.3.1")
    def load(cls, sc, path):
        """Load a model from the given path"""
        model = cls._load_java(sc, path)
        wrapper = 				   sc._jvm.org.apache.spark.mllib.api.python.MatrixFactorizationModelWrapper(model)
        return MatrixFactorizationModel(wrapper)

三 本文实验代码

本文代码的train_data_wx.log位于hdfs上,其格式为用户id,歌曲id,点歌次数。程序读取训练数据,转换成RDD,再利用ALS的train方法训练模型,最后模型作出预测。后续还需要计算查全率,查准率评估模型的好坏。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import itertools
from math import sqrt
from operator import add

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import Rating
from math import sqrt

if __name__ == "__main__":

    file_path = './train_data_wx.log'

    # set up environment
    conf = SparkConf() \
      .setAppName("SongALS")
    # sc = SparkContext(conf=conf)

    sc = SparkContext('local','testing')
    #读取csv到RDD,csv格式:用户id,歌曲id,评分
    user_data = sc.textFile(file_path)
    rates = user_data.map(lambda x: x.split(","))
    #将RDD的每一行记录转为Rating类
    ratings = rates.map(lambda x: Rating(int(x[0]),int(x[1]),float(x[2])))
    numRatings = ratings.count()
    print(f"共有 {numRatings} 条记录!")
    numUsers = ratings.map(lambda r: r[0]).distinct().count()
    numSongs = ratings.map(lambda r: r[1]).distinct().count()
    
    print ("共有 %d 用户 %d 歌曲 %d 记录" % (numUsers, numSongs, numRatings))
    #取部分数据
    ratings = ratings.sample(withReplacement=False, fraction=0.4)

    #划分训练集,测试集
    trainset,testset = ratings.randomSplit([0.01, 0.99], 17)
    train_cnt, test_cnt = trainset.count(), testset.count()
    print(f'训练集大小 {train_cnt} 测试集大小 {test_cnt}')

    #训练模型
    rank = 10
    iters = 5
    lambda_ = 0.02
    model = ALS.train(ratings=ratings, rank=rank, iterations=iters, lambda_=lambda_)

    #查看用户向量,物品向量
    # model.productFeatures().first()
    # model.userFeatures().first()

    # model.recommendUsers(242,100) #将物品X推荐给Y个用户
    # model.recommendProducts(196,10) #为用户Y推荐X个物品
    # model.predict(196, 242) #计算单个用户对单个物品的评分

    #给用户X推荐Y首歌
    # ret01 = model.recommendProducts(5333266, 10)
    # print('给用户5333266推荐歌曲',ret01)
    #给每个用户推荐10首歌
    # ret = model.recommendProductsForUsers(10).collect()

    # 计算多个用户对多个物品的评分
    pred_input = trainset.map(lambda x:(x[0],x[1]))
    print(f'pred_input {pred_input.first()}') 
    #返回 Ratins(user, item, prediction)

     #Rating(user=6033336, product=18983, rating=2.618844339959054)
    pred = model.predictAll(pred_input) 
    print(f'pred {pred.first()}')

    #Rating(user=5333266, product=13033013, rating=141.31803975014338)
    #计算在训练集上模型的损失
    true_reorg = trainset.map(lambda x:((int(x[0]),int(x[1])), float(x[2])))
    print(f'true_reorg {true_reorg.take(10)}') 
    #true_reorg (('68622277', '00005565'), '1')
    pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))
    print(f'pred_reorg {pred_reorg.take(10)}')
    #pred_reorg ((7093988, 13013115), 7.8259344808305045)

    #内连接,将两个RDD按照相同的key值join起来
    true_pred = true_reorg.join(pred_reorg)
    print(true_pred.take(10))
    MSE = true_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    print(MSE)
    train_loss = sqrt(MSE)
    print(f'训练集损失 {train_loss}')


    #计算在测试上模型的损失
    test_reorg = testset.map(lambda x:((int(x[0]), int(x[1])), float(x[2])))
    #返回 Ratins(user, item, prediction)
    pred_test = model.predictAll(testset.map(lambda x:(x[0], x[1])))
    pred_test_reorg = pred_test.map(lambda x: ((x[0],x[1]), x[2]))
    print(pred_test_reorg.take(10))
    joined = test_reorg.join(pred_test_reorg)
    test_loss = sqrt(joined.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    print(f'测试集损失 {test_loss}')


    #结果保存到文件
    # ret.saveAsTextFile('recommendation_all.txt')
    #保存模型
    model.save(sc, 'ALSmodel')

    
    

运行结果:

注意:低维矩阵的不同初始化值会导致最终得到的模型不同,推荐结果也就不同。换言之,每次运行该算法都可能得到不同的推荐结果。

共有 7225681 条记录!
共有 2487348 用户 135680 歌曲 7225681 记录
.......
给用户5333266推荐歌曲
 [Rating(user=5333266, product=77001289, rating=196.85648434824077), Rating(user=5333266, product=13021210, rating=85.79594303230434), Rating(user=5333266, product=30000671, rating=52.32716316197924), Rating(user=5333266, product=43003130, rating=49.60144478622088), Rating(user=5333266, product=13045878, rating=48.9048573507505), Rating(user=5333266, product=13029740, rating=47.5545824117059), Rating(user=5333266, product=72914127, rating=46.985176730635715), Rating(user=5333266, product=18000204, rating=46.535270873531), Rating(user=5333266, product=14005531, rating=46.14877398716308), Rating(user=5333266, product=93028773, rating=45.77977031658572)]


发表评论