前言
关于聚类的经典算法K-Means 算法在第一期就已经讲过了,需要回顾的同学点这里,现在顺着spark的api继续看下去,又看到聚类这一块,就着重讲一下基于流数据的K-Menas算法使用,因为最近比较忙的,所以感觉又一些东西都还没很完整的看完,比如回归的算法,还有聚类的一些统计学方法,这些都会在今后慢慢补上,给自己一个小目标,今年6月底把spark的api全部都过一遍,然后开始又针对地去做人脸识别相关的学习,还有神经网络和深度学习,在此之前希望我能打下较好的基础。
有点扯远了,看回今天的主题,Streaming k-means——基于流的k-means算法,其实就是在原有的k-means算法里面引入流的概念,并放在分布式的集群上去做,下面我们就来简单了解一下。
流式k-means算法
spark官网讲的挺清楚的,我就翻译下搬过来了,官网地址:点这里
当数据是以流的方式到达的时候,我们可能想动态地去估算(estimate)聚类的簇,通过新的到达的数据来更新它们。
spark.mllib
支持流式k-means
聚类,并且可以通过参数控制估计衰减(decay)( 或“忽略”(forgetfulness) )。 这个算法使用一般地小批量更新规则来更新簇。对每批新到的数据,我们首先将点分配给距离它们最近的簇,然后计算新的数据中心,最后更新每一个簇。使用的公式如下所示:
$$\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation}$$
$$\begin{equation} n_{t+1} = n_t + m_t \end{equation}$$
在上面的公式中,$c_{t}$表示前一个簇中心,$n_{t}$表示至今位置分配给这个簇的点的数量, $x_{t}$表示从当前批数据中计算出来的簇中心,$m_{t}$表示当前添加的这批数据的点数量。衰减因子 $\alpha$ 可以被用来忽略过去的聚类中心:当 $\alpha$ 等于1时,所有的批数据赋予相同的权重,当 $\alpha$ 等于0时,数据中心点完全通过当前数据确定。
这里我想稍微讲一下这个衰减因子 $\alpha$ ,看他在公式中,当 $\alpha = 0$ 的时候,其实就相当于完全忽略掉前面历史累积下来的聚类中心的结果了,所以说他是“衰减因子”挺贴切的,这个系数可以在每一次迭代中消减历史数据带来的影响,这个系数越大,则代表着越看重最新的数据。要知道源源不断的流数据,其实还是挺看重实时数据的,比如说你冬天喜欢买羽绒,到了夏天不可能还这么喜欢买羽绒吧。。。
衰减因子$\alpha$ 也可以通过
halfLife
参数联合 时间单元(time unit
)来确定,时间单元可以是一批数据也可以是一个数据点。假如数据从t
时刻到来并定义了halfLife
为h
, 在t+h
时刻,应用到t
时刻的数据的折扣(discount
)为0.5。
halfLife翻译过来就是半衰期,挺有意思的,这个参数可以让衰减因子随着时间来动态调整,越来越依赖新的数据,在过完一个时间单元(或者叫周期吧)重置。应该说这个衰减因子的变化在每个时间单元中独立生效。
算法步骤
流式k-means
算法的步骤如下所示:
- (1)分配新的数据点到离其最近的簇;
- (2)根据时间单元(
time unit
)计算折扣(discount
)值,并更新簇权重; - (3)应用更新规则;
- (4)应用更新规则后,有些簇可能消失了,那么切分最大的簇为两个簇。
应用场景
流式K-means的应用场景还是挺广阔的,比如在实时推荐预测系统方面,比如广告推荐、商业预测之类的,由于推荐预测系统对数据时效性的敏感度较高,而且其数据处于连续实时且快速的变化,所以必须建立起流式的机器学习应用,从而对流式的数据进行实时的预测分析与处理,这对于商业分析与运营而言将十分关键。
另外还有一篇论文,将该算法应用在数据安全领域,用来实时验证分析大数据环境下的DDoS攻击检测,参考论文:《基于Spark Streaming的实时数据分析系统及其应用》
代码demo
这里训练数据我们还是用回之前k-means用过的数据集,值得一提的是,这里的测试集需要改成(1.0), [1.7, 0.4, 0.9]
这种标签向量的格式。我们就把上一个k-means聚类出来的结果作为这里的测试集试试。
每个训练点应格式化为
[x1, x2, x3]
,并且每个测试数据点应格式化为(y, [x1, x2, x3])
,其中y
是一些有用的标签或标识符(例如,真正的类别分配)
除此以外还要注意的是,spark streaming读取文件的时候,旧文件是不识别的,他只会读取新创建的文件,也就是说我们要单独写一个文件IO,把训练集和测试集以流形式给写进去。。。不过生产环境的时候,就可以上kafka来做信息读取了。
下面贴出训练的主函数:
1 | package SparkMLlib.Clustering |
再给出一个文件读写的简易demo:
1 | def FileIODemo(): Unit ={ |
完整代码请前往github查看,文件路径请灵性修改。。。另外数据集分别就是Wholesale customers data_training.txt
和 StreamingKmeans-test.txt
,我会放在github上:点我前往github