【王喆-推荐系统】特征工程篇-(task2)用Spark进行特征处理

网友投稿 804 2022-05-29

学习心得

(1)本次task学习了推荐系统中特征处理的主要方式,并利用 Spark 实践了类别型特征和数值型特征的主要处理方法,深度学习和传统机器学习的区别并不大,TensorFlow、PyTorch 等深度学习平台也提供了类似的特征处理函数。

(2)其中几个特征处理API:

Normalizer,是范式归一化操作,保证归一化之后范式为1

StandardScaler,是标准差归一化操作,保证归一化之后均值为0标准差为1

RobustScaler,是使用分位数进行鲁棒归一化操作,可以有效减少异常值的干扰

MinMaxScaler,是使用最大值和最小值进行归一化操作。

(3)Spark 的计算过程:Stage 内部数据高效并行计算,Stage 边界处进行消耗资源的 shuffle 操作或者最终的 reduce 操作。

注意:OneHotEncoderEstimator()在PySpark 3.0.0及以上版本已经更改为 OneHotEncoder()。

文章目录

学习心得

零、背景引入

一、业界主流的大数据处理利器:Spark

1.1 Spark原理

1.2 一个具体栗子

二、如何利用 One-hot 编码处理类别型特征

2.1 One-hot编码

2.2 Sparrow系统栗子

2.3 Multiple编码

三、数值型特征的处理——归一化和分桶

3.1 解决特征的尺度相差过大

3.2 解决特征分布不均匀问题

3.3 YouTube的数值型特征处理

四、作业

五、答疑

Reference

零、背景引入

上次task学习了推荐系统要使用的常用特征——基本分为“用户行为”、“用户关系”、“属性标签”、“内容数据”、“场景信息”这五个类别。但这些原始的特征是无法直接提供给推荐模型使用的,因为

推荐模型本质上是一个函数,输入输出都是数字或数值型的向量

。像动作、喜剧、爱情、科幻这些电影风格,是怎么转换成数值供推荐模型使用的呢?用户的行为历史又是怎么转换成数值特征的呢?

类似的特征处理过程在数据量变大之后还会变得更加复杂,因为工业界的数据集往往都是 TB 甚至 PB 规模的,这在单机上肯定是没法处理的。那业界又是怎样进行海量数据的特征处理呢?

一、业界主流的大数据处理利器:Spark

1.1 Spark原理

Spark是业界主流的大数据处理利器。

分布式:指的是计算节点之间不共享内存,需要通过网络通信的方式交换数据。

Spark 是一个分布式计算平台。Spark 最典型的应用方式就是建立在大量廉价的计算节点上,这些节点可以是廉价主机,也可以是虚拟的 Docker Container(Docker 容器)。

Spark 的架构图中:

Spark 程序由 Manager Node(管理节点)进行调度组织

由 Worker Node(工作节点)进行具体的计算任务执行

最终将结果返回给 Drive Program(驱动程序)。

在物理的 Worker Node 上,数据还会分为不同的 partition(数据分片),可以说 partition 是 Spark 的基础数据单元。

Spark 计算集群能够比传统的单机高性能服务器具备更强大的计算能力,就是由这些成百上千,甚至达到万以上规模的工作节点并行工作带来的。

1.2 一个具体栗子

那在执行一个具体任务的时候,Spark 是怎么协同这么多的工作节点,通过并行计算得出最终的结果呢?这里我们用一个任务来解释一下 Spark 的工作过程。

一个具体任务过程:

(1)先从本地硬盘读取文件 textFile;

(2)再从分布式文件系统 HDFS 读取文件 hadoopFile;

(3)然后分别对它们进行处理;

(4)再把两个文件按照 ID 都 join 起来得到最终的结果。

在 Spark 平台上处理这个任务的时候,会将这个任务拆解成一个子任务 DAG(Directed Acyclic Graph,有向无环图),再根据 DAG 决定程序各步骤执行的方法。从图 2 中可以看到,这个 Spark 程序分别从 textFile 和 hadoopFile 读取文件,再经过一系列 map、filter 等操作后进行 join,最终得到了处理结果。

最关键的过程是要理解

哪些是可以纯并行处理的部分,哪些是必须 shuffle(混洗)和 reduce 的部分

:这里的 shuffle 指的是所有 partition 的数据必须进行洗牌后才能得到下一步的数据,最典型的操作就是图 2 中的 groupByKey 操作和 join 操作。以 join 操作为例,必须对 textFile 数据和 hadoopFile 数据做全量的匹配才可以得到 join 后的 dataframe(Spark 保存数据的结构)。而 groupByKey 操作则需要对数据中所有相同的 key 进行合并,也需要全局的 shuffle 才能完成。

与之相比,map、filter 等操作仅需要逐条地进行数据处理和转换,不需要进行数据间的操作,因此各 partition 之间可以完全并行处理。

在得到最终的计算结果之前,程序需要进行 reduce 的操作,从各 partition 上汇总统计结果,随着 partition 的数量逐渐减小,reduce 操作的并行程度逐渐降低,直到将最终的计算结果汇总到 master 节点(主节点)上。可以说,shuffle 和 reduce 操作的触发决定了纯并行处理阶段的边界。

注意:

(1)

shuffle 操作需要在不同计算节点之间进行数据交换,非常消耗计算、通信及存储资源

,因此 shuffle 操作是 spark 程序应该尽量避免的。

shuffle可以理解为一个串行操作,需要等到在此之前的并行工作完成之后才可以顺序开始。

(2)简述Spark 的计算过程:Stage 内部数据高效并行计算,Stage 边界处进行消耗资源的 shuffle 操作或者最终的 reduce 操作。

下面将应用Spark在推荐系统的特征处理上,用 Spark 处理我们的 Sparrow Recsys 项目的数据集。带着2个问题学习: 经典的特征处理方法有什么?Spark 是如何实现这些特征处理方法的?

二、如何利用 One-hot 编码处理类别型特征

2.1 One-hot编码

广义上来讲,所有的特征都可以分为两大类:

(1)第一类是类别、ID 型特征(以下简称类别型特征)。

拿电影推荐来说,电影的风格、ID、标签、导演演员等信息,用户看过的电影 ID、用户的性别、地理位置信息、当前的季节、时间(上午,下午,晚上)、天气等等,这些无法用数字表示的信息全都可以被看作是类别、ID 类特征。

(2)第二类是数值型特征,能用数字直接表示的特征就是数值型特征。

典型的包括用户的年龄、收入、电影的播放时长、点击量、点击率等。

特征处理的目的:把所有的特征全部转换成一个数值型的特征向量,对于数值型特征,这个过程非常简单,直接把这个数值放到特征向量上相应的维度上就可以了。但是对于类别、ID 类特征,怎么处理它们呢?

这里就要用到 One-hot 编码(也被称为独热编码),它是

将类别、ID 型特征转换成数值向量的一种最典型的编码方式

。它通过把所有其他维度置为 0,单独将当前类别或者 ID 对应的维度置为 1 的方式生成特征向量。

One-hot编码举栗:假设某样本有三个特征,分别是星期、性别和城市,我们用 [Weekday=Tuesday, Gender=Male, City=London] 来表示,用 One-hot 编码对其进行数值化的结果。

除了上面栗子的类别型特征外,ID 型特征也经常使用 One-hot 编码。

比如,在 SparrowRecsys 中,用户 U 观看过电影 M,这个行为是一个非常重要的用户特征,如何向量化这个行为呢?其实也是使用 One-hot 编码。假设,我们的电影库中一共有 1000 部电影,电影 M 的 ID 是 310(编号从 0 开始),那这个行为就可以用一个 1000 维的向量来表示,让第 310 维的元素为 1,其他元素都为 0。

2.2 Sparrow系统栗子

下面看看 SparrowRecsys 是如何利用 Spark 完成这一过程的。这里使用 Spark 的机器学习库 MLlib 来完成 One-hot 特征的处理。

其中,最主要的步骤是,先创建一个负责 One-hot 编码的转换器,OneHotEncoderEstimator,然后通过它的 fit 函数完成指定特征的预处理,并利用 transform 函数将原始特征转换成 One-hot 特征。实现思路大体上就是这样,具体的步骤可以参考下面给出的源码:

def oneHotEncoderExample(samples:DataFrame): Unit ={ //samples样本集中的每一条数据代表一部电影的信息,其中movieId为电影id val samplesWithIdNumber = samples.withColumn("movieIdNumber", col("movieId").cast(sql.types.IntegerType)) //利用Spark的机器学习库Spark MLlib创建One-hot编码器 val oneHotEncoder = new OneHotEncoderEstimator() .setInputCols(Array("movieIdNumber")) .setOutputCols(Array("movieIdVector")) .setDropLast(false) //训练One-hot编码器,并完成从id特征到One-hot向量的转换 val oneHotEncoderSamples = oneHotEncoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber) //打印最终样本的数据结构 oneHotEncoderSamples.printSchema() //打印10条样本查看结果 oneHotEncoderSamples.show(10) _(参考 com.wzhe.sparrowrecsys.offline.spark.featureeng.FeatureEngineering__中的oneHotEncoderExample函数)_

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

【王喆-推荐系统】特征工程篇-(task2)用Spark进行特征处理

18

19

20

One-hot 编码也可以自然衍生成 Multi-hot 编码(多热编码)。比如,对于历史行为序列类、标签特征等数据来说,用户往往会与多个物品产生交互行为,或者

一个物品被打上多个标签,这时最常用的特征向量生成方式就是把其转换成 Multi-hot 编码

在 SparrowRecsys 中,因为每个电影都是有多个 Genre(风格)类别的,所以我们就可以用 Multi-hot 编码完成标签到向量的转换。

可以尝试着用 Spark 实现该过程,也可以参考 SparrowRecsys 项目中 multiHotEncoderExample 的实现。

2.3 Multiple编码

Multiple编码特征将多个属性同时编码到一个特征中。在推荐场景中,单个用户对哪些物品感兴趣的特征就是一种Multiple编码特征,如,表示某用户对产品1、产品2、产品3、产品4是否感兴趣,则这个特征可能有多个取值,如用户A对产品1和产品2感兴趣,用户B对产品1和产品4感兴趣,用户C对产品1、产品3和产品4感兴趣,则用户兴趣特征为

用户 UserInterests A [1, 2] B [1, 4] C [1, 3, 4]

1

2

3

4

Multiple编码采用类似oneHot编码的形式进行编码,根据物品种类数目,展成物品种类数目大小的向量,当某个用户感兴趣时,对应维度为1,反之为0,如下

用户 UserInterests A [1, 1, 0, 0] B [1, 0, 0, 1] C [1, 0, 1, 1]

1

2

3

4

如何使用Multiple编码呢?

我们将多个属性同时编码到同一个特征中,目的就是同时利用多个属性的特征。经过Multiple编码后的特征大小为[batch_size, num_items],记作U,构建物品items的Embedding矩阵,该矩阵维度为[num_items, embedding_size],记作V,将矩阵U和矩阵V相乘,我们就得到了大小为[batch_size, embedding_size]的多属性表示。

三、数值型特征的处理——归一化和分桶

为啥处理特征:一是特征的尺度,二是特征的分布。

3.1 解决特征的尺度相差过大

前者即防止特征尺度之间相距过大,比如在电影推荐中有两个特征,一个是电影的评价次数 fr(无上限),一个是电影的平均评分 fs。fr波动范围高于fs几个数量级,可能会完全掩盖fs作用,所以将两个特征尺度拉平到一个区域内(通常是[0, 1],即所谓的归一化)。

3.2 解决特征分布不均匀问题

归一化虽然能够解决特征取值范围不统一的问题,但无法改变特征值的分布。比如图 5 就显示了 Sparrow Recsys 中编号在前 1000 的电影平均评分分布。由于人们打分有“中庸偏上”的倾向,因此评分大量集中在 3.5 的附近,而且越靠近 3.5 的密度越大。这对于模型学习来说也不是一个好的现象,因为

特征的区分度并不高

经常会用分桶的方式来解决特征值分布极不均匀的问题。

分桶(Bucketing)

:将样本按照某特征的值从高到低排序,然后按照桶的数量找到分位数,将样本分到各自的桶中,再用桶 ID 作为特征值。

在 Spark MLlib 中,分别提供了两个转换器 MinMaxScaler 和 QuantileDiscretizer,来进行归一化和分桶的特征处理。它们的使用方法和之前介绍的 OneHotEncoderEstimator 一样,都是先用 fit 函数进行数据预处理,再用 transform 函数完成特征转换。下面的代码就是 SparrowRecSys 利用这两个转换器完成特征归一化和分桶的过程。

def ratingFeatures(samples:DataFrame): Unit ={ samples.printSchema() samples.show(10) //利用打分表ratings计算电影的平均分、被打分次数等数值型特征 val movieFeatures = samples.groupBy(col("movieId")) .agg(count(lit(1)).as("ratingCount"), avg(col("rating")).as("avgRating"), variance(col("rating")).as("ratingVar")) .withColumn("avgRatingVec", double2vec(col("avgRating"))) movieFeatures.show(10) //分桶处理,创建QuantileDiscretizer进行分桶,将打分次数这一特征分到100个桶中 val ratingCountDiscretizer = new QuantileDiscretizer() .setInputCol("ratingCount") .setOutputCol("ratingCountBucket") .setNumBuckets(100) //归一化处理,创建MinMaxScaler进行归一化,将平均得分进行归一化 val ratingScaler = new MinMaxScaler() .setInputCol("avgRatingVec") .setOutputCol("scaleAvgRating") //创建一个pipeline,依次执行两个特征处理过程 val pipelineStage: Array[PipelineStage] = Array(ratingCountDiscretizer, ratingScaler) val featurePipeline = new Pipeline().setStages(pipelineStage) val movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures) //打印最终结果 movieProcessedFeatures.show( _(参考 com.wzhe.sparrowrecsys.offline.spark.featureeng.FeatureEngineering中的ratingFeatures函数)_

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

3.3 YouTube的数值型特征处理

在经典的 YouTube 深度推荐模型中,可以看到一些很有意思的处理方法。比如,在处理观看时间间隔(time since last watch)和视频曝光量(previous impressions)这两个特征时,YouTube 模型对它们进行归一化后,又将它们各自处理成了三个特征(图 6 中红框内的部分),分别是原特征值 x,特征值的平方x^2,以及特征值的开方,这又是为什么呢?

无论是平方还是开方操作,改变的还是这个特征值的分布,这些操作与分桶操作一样,都是希望

通过改变特征的分布,让模型能够更好地学习到特征内包含的有价值信息

。但由于我们没法通过人工的经验判断哪种特征处理方式更好,所以索性把它们都输入模型,让模型来做选择。

四、作业

(1)请你查阅一下 Spark MLlib 的编程手册,找出 Normalizer、StandardScaler、RobustScaler、MinMaxScaler 这个几个特征处理方法有什么不同。

Normalizer、StandardScaler、RobustScaler、MinMaxScaler 都是用让数据无量纲化

Normalizer: 正则化;(和Python的sklearn一样是按行处理,而不是按列[每一列是一个特征]处理,原因是:Normalization主要思想是对每个样本计算其p-范数,然后对该样本中每个元素除以该范数,这样处理的结果是使得每个处理后样本的p-范数(l1-norm,l2-norm)等于1。)针对每行样本向量:l1: 每个元素/样本中每个元素绝对值的和,l2: 每个元素/样本中每个元素的平方和开根号,lp: 每个元素/每个元素的p次方和的p次根,默认用l2范数。

StandardScaler:数据标准化; ( x i − u ) / σ (xi - u) / σ (xi−u)/σ 【u:均值,σ:方差】当数据(x)按均值(μ)中心化后,再按标准差(σ)缩放,数据就会服从为均值为0,方差为1的正态分布(即标准正态分布)。

RobustScaler: ( x i − m e d i a n ) / I Q R (xi - median) / IQR (xi−median)/IQR 【median是样本的中位数,IQR是样本的 四分位距:根据第1个四分位数和第3个四分位数之间的范围来缩放数据】

MinMaxScaler:数据归一化, ( x i − m i n ( x ) ) / ( m a x ( x ) − m i n ( x ) ) (xi - min(x)) / (max(x) - min(x)) (xi−min(x))/(max(x)−min(x)) ;当数据(x)按照最小值中心化后,再按极差(最大值 - 最小值)缩放,数据移动了最小值个单位,并且会被收敛到 [0,1]之间

(2)你能试着运行一下 SparrowRecSys 中的 FeatureEngineering 类,从输出的结果中找出,到底哪一列是我们处理好的 One-hot 特征和 Multi-hot 特征吗?以及这两个特征是用 Spark 中的什么数据结构来表示的呢?

答:One-hot特征是调用OneHotEncoderEstimator对movieId转换,生成了特征movieIdVector;

Multi-hot 特征是调用Vectors.sparse方法,对处理后的genreIndexes转换,生成vector。

这俩个特征都是稀疏向量表示(数据结构:SparseVector),不是稠密向量。

其中的数据分别是:(类别数量,索引数组,值数组)。索引数组长度必须等于值数组长度。

五、答疑

(1)对训练数据进行平方或者开方,是为了改变训练数据的分布。训练数据的分布被改变后,训练出来的模型岂不是不能正确拟合训练数据了。

答:本质上是改变了特征的分布,特征的分布和训练数据的分布没有本质的联系。只要你不改变训练数据label的分布,最终预测出的结果都应该是符合数据本身分布的。因为你要预测的是label,并不是特征本身。

Reference

(1)《深度学习推荐系统实战》,王喆

(2)王喆大佬的github:https://github.com/wzhe06

(3)Machine Learning Library (MLlib)

(4)https://www.codeleading.com/article/97252516619/#_OneHot_19

(5)http://spark.apache.org/docs/3.0.0/api/python/index.html

spark 推荐系统

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:YOLOX实战:超详细!手把手教你使用YOLOX进行物体检测(附数据集)
下一篇:Python【系列教程】之基础学习笔记
相关文章