spark机器学习笔记:(一)Spark Python初探

article/2025/10/19 14:02:54

声明:版权所有,转载请联系作者并注明出处  http://blog.csdn.net/u013719780?viewmode=contents


博主简介:风雪夜归子(英文名:Allen),机器学习算法攻城狮,喜爱钻研Meachine Learning的黑科技,对Deep Learning和Artificial Intelligence充满兴趣,经常关注Kaggle数据挖掘竞赛平台,对数据、Machine Learning和Artificial Intelligence有兴趣的童鞋可以一起探讨哦,个人CSDN博客:http://blog.csdn.net/u013719780?viewmode=contents


        本系列的博文主要来源于《用spark机器学习》这本书的读书笔记,如果对scala熟悉,建议看原书,原书绝大多数代码都是采用scala书写的,如果对Python比较熟悉,也许本系列笔记对你有所帮助,为了各位童鞋看原书的方便,本系列的博文基本上保持了变量名与原书一致。


         Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群上的并行程序的编写。该框架对资源调度,任务的提交、执行和跟踪,节点间的通信以及数据并行处理的内在底层操作都进行了抽象。它提供了一个更高级别的API用于处理分布式数据。从这方面说,它与Apache Hadoop等分布式处理框架类似。但在底层架构上,Spark与它们有所不同。

        Spark起源于加利福利亚大学伯克利分校的一个研究项目。学校当时关注分布式机器学习算法的应用情况。因此,Spark从一开始便为应对迭代式应用的高性能需求而设计。在这类应用中,相同的数据会被多次访问。该设计主要靠利用数据集内存缓存以及启动任务时的低延迟和低系统开销来实现高性能。再加上其容错性、灵活的分布式数据结构和强大的函数式编程接口,Spark在各类基于机器学习和迭代分析的大规模数据处理任务上有广泛的应用,这也表明了其实用性。

        本系列的spark笔记主要采用Python语言。

Spark支持四种运行模式。
 本地单机模式:所有Spark进程都运行在同一个Java虚拟机(Java Vitural Machine,JVM)中。
 集群单机模式:使用Spark自己内置的任务调度框架。
 基于Mesos:Mesos是一个流行的开源集群计算框架。
 基于YARN:即Hadoop 2,它是一个与Hadoop关联的集群计算和资源调度框架。


1. Spark 编程模型

       在对Spark的设计进行更全面的介绍前,我们先介绍SparkContext对象以及Spark shell。后面将通过它们来了解Spark编程模型的基础知识。


1.1 SparkContext类与SparkConf类

         任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的。SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数(比如主节点的URL)。初始化后,我们便可用SparkContext对象所包含的各种方法来创建和操作分布式数据集和共享变量。Spark shell(在Scala和Python下可以,但不支持Java)能自动完成上述初始化。若要用Scala代码来实现的话,可参照下面的代码:

val conf = new SparkConf().setAppName("Test Spark App").setMaster("local[4]")
val sc = new SparkContext(conf)

这段代码会创建一个4线程的SparkContext对象,并将其相应的任务命名为Test SparkAPP。我们也可通过如下方式调SparkContext的简单构造函数,以默认的参数值来创建相应的对象。其效果和上述的完全相同:
val sc = new SparkContext("local[4]", "Test Spark App")


1.2 Spark shell

        Spark支持用Scala或Python REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程序编写。由于输入的代码会被立即计算,shell能在输入代码时给出实时反馈。在Scala shell里,命令执行结果的值与类型在代码执行完后也会显示出来。要想通过Scala来使用Spark shell,只需从Spark的主目录执行./bin/spark-shell。它会启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个对象。

        要想在Python shell中使用Spark,直接运行./bin/pyspark命令即可,如果配置了pyspark的环境变量,则直接运行pyspark命令即可。与Scala shell类似, Python下的SparkContext对象可以通过Python变量sc来调用。上述命令的终端输出应该如下图所示:



1.3 弹性分布式数据集

        RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。一个RDD代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。Spark中的RDD具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。


1.3.1 创建RDD

        从现有集合创建

collection = list(["a", "b", "c", "d", "e"])
rddFromCollection = sc.parallelize(collection)


        RDD也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。基于Hadoop的RDD可以使用任何实现了Hadoop InputFormat接口的输入格式,包括文本文件、其他Hadoop标准格式、HBase、Cassandra等。以下举例说明如何用一个本地文件系统里的文件创建RDD:

rddFromTextFile = sc.textFile("LICENSE")

上述代码中的textFile函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个表示文本文件中某一行文字的String(字符串)对象。


1.3.2  spark操作

       创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。Spark的操作通常采用函数式风格。对于那些熟悉用Scala或Python进行函数式编程的程序员来说,这不难掌握。但Spark API其实容易上手,所以那些没有函数式编程经验的程序员也不用担心。
       Spark程序中最常用的转换操作便是map操作。该操作对一个RDD里的每一条记录都执行某个函数,从而将输入映射成为新的输出。比如,下面这段代码便对一个从本地文本文件创建的RDD进行操作。它对该RDD中的每一条记录都执行size函数。之前我们曾创建过一个这样的由若干String构成的RDD对象。通过map函数,我们将每一个字符串都转换为一个整数,从而返回一个由若干Int构成的RDD对象。
intsFromStringsRDD = rddFromTextFile.map(lambda line: line.size)

        Spark的大多数操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果(比如上面例子中,count返回一个Long,sum返回一个Double)。这就意味着多个操作可以很自然地前后连接,从而让代码更为简洁明了。举例来说,用下面的一行代码可以得到和上面例子相同的结果:
aveLengthOfRecordChained = rddFromTextFile.map(lambda line: line.size).sum() /rddFromTextFile.count()
       值得注意的一点是,Spark中的转换操作是延后的。也就是说,在RDD上调用一个转换操作并不会立即触发相应的计算。相反,这些转换操作会链接起来,并只在有执行操作被调用时才被高效地计算。这样,大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序,从而提高了Spark的效率。

       这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会得到任何结果。比如下面的代码就只是返回一个表示一系列转换操作的新RDD:
transformedRDD = rddFromTextFile.map(lambda line: line.size).filter(lambda size : size > 10).map(lambda size : size * 2)

注意,这里实际上没有触发任何计算,也没有结果被返回。如果我们现在在新的RDD上调用一个执行操作,比如sum,该计算将会被触发:
computation = transformedRDD.sum()


2. RDD缓存策略

       Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache函数来实现:
rddFromTextFile.cache
调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。

Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。广播变量(broadcast variable)为只读变量,它由运行SparkContext的驱动程序创建后发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如机器学习,这非常有用。Spark下创建广播变量只需在SparkContext上调用一个方法即可:

>>> broadcastAList = sc.broadcast(list(["a", "b", "c", "d", "e"]))


16/06/26 21:04:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 296.0 B, free 296.0 B)
16/06/26 21:04:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 110.0 B, free 406.0 B)
16/06/26 21:04:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:36878 (size: 110.0 B, free: 517.4 MB)
16/06/26 21:04:50 INFO SparkContext: Created broadcast 0 from broadcast at PythonRDD.scala:430

终端的输出表明,广播变量存储在内存中,占用的空间大概是110字节,仍余下517MB可用空间。

       广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的value方法:

sc.parallelize(list(["1", "2", "3"])).map(lambda x: broadcastAList.value).collect()

       上述一行代码会从{"1", "2", "3"}这个集合(一个Scala List)里,新建一个带有三条记录的RDD。map函数里的代码会返回一个新的List对象。这个对象里的记录由之前创建的那个broadcastAList里的记录与新建的RDD里的三条记录分别拼接而成。

       注意,上述代码使用了collect函数。这个函数是一个Spark执行函数,它将整个RDD以Scala(Python或Java)集合的形式返回驱动程序。collect函数一般仅在的确需要将整个结果集返回驱动程序并进行后续处理时才有必要调用。如果在一个非常大的数据集上调用该函数,可能耗尽驱动程序的可用内存,进而导致程序崩溃。高负荷的处理应尽可能地在整个集群上进行,从而避免驱动程序成为系统瓶颈。然而在不少情况下,将结果收集到驱动程序的确是有必要的。很多机器学习算法的迭代过程便属于这类情况。

        累加器(accumulator)也是一种被广播到工作节点的变量。累加器与广播变量的关键不同,是后者只能读取而前者却可累加。但支持的累加操作有一定的限制。具体来说,这种累加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算以及返回驱动程序。每一个工作节点只能访问和操作其自己本地的累加器,全局累加器则只允许驱动程序访问。累加器同样可以在Spark代码中通过value访问。


3. Spark Python编程入门

        Spark的Python API几乎覆盖了所有Scala API所能提供的功能,只有极少数的一些特性和个别的API方法,暂时还不支持。但通常不影响我们使用Spark Python进行编程。下面看一个简单的实例:


"""用Python编写的一个简单Spark应用"""

#filename pythonapp.py
from pyspark import SparkContext
sc = SparkContext("local[2]", "First Spark App")
# 将CSV格式的原始数据转化为(user,product,price)格式的记录集
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line:line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# 求总购买次数
numPurchases = data.count()
# 求有多少不同客户购买过商品
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# 求和得出总收入
totalRevenue = data.map(lambda record: float(record[2])).sum()
# 求最畅销的产品是什么
products = data.map(lambda record: (record[1], 1.0)).reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]
print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])


spark提交脚本命令如下:

tanyouwei@tanyouwei:~$  spark-submit pythonapp.py



http://chatgpt.dhexx.cn/article/4w9C7h0X.shtml

相关文章

Spark Machine Learning(SparkML):机器学习(部分三)

目录 8.协同过滤(Collaborative Filtering) 8.1交替最小二乘ALS 8.2显式和隐式反馈 8.3缩放正则化参数 8.4冷启动策略 8.5代码示例: 9.频繁模式挖掘(Frequent Pattern Mining) FP-Growth PrefixSpan 10.ML优化:模型选择和超参数调优 模型选择&#xff08…

spark机器学习算法研究

spark提供了一个机器学习库,spark ml包,可以在spark中直接引入使用 import org.apache.spark.ml.clustering.{KMeans,KMeansModel} import org.apache.spark.ml.linalg.Vectors1. 常用聚类算法 k-means 算法 GMM 高斯混合模型 PIC 快速迭代聚类 LDA 隐式…

手把手带你玩转Spark机器学习-使用Spark进行数据处理和数据转换

系列文章目录 手把手带你玩转Spark机器学习-专栏介绍手把手带你玩转Spark机器学习-问题汇总手把手带你玩转Spark机器学习-Spark的安装及使用手把手带你玩转Spark机器学习-使用Spark进行数据处理和数据转换手把手带你玩转Spark机器学习-使用Spark构建分类模型手把手带你玩转Spa…

手把手带你玩转Spark机器学习-使用Spark构建分类模型

系列文章目录 手把手带你玩转Spark机器学习-专栏介绍手把手带你玩转Spark机器学习-问题汇总手把手带你玩转Spark机器学习-Spark的安装及使用手把手带你玩转Spark机器学习-使用Spark进行数据处理和数据转换手把手带你玩转Spark机器学习-使用Spark构建分类模型手把手带你玩转Spa…

手把手带你玩转Spark机器学习-Spark的安装及使用

系列文章目录 手把手带你玩转Spark机器学习-专栏介绍手把手带你玩转Spark机器学习-问题汇总手把手带你玩转Spark机器学习-Spark的安装及使用手把手带你玩转Spark机器学习-使用Spark进行数据处理和数据转换手把手带你玩转Spark机器学习-使用Spark构建分类模型手把手带你玩转Spa…

手把手带你玩转Spark机器学习-深度学习在Spark上的应用

系列文章目录 手把手带你玩转Spark机器学习-专栏介绍手把手带你玩转Spark机器学习-问题汇总手把手带你玩转Spark机器学习-Spark的安装及使用手把手带你玩转Spark机器学习-使用Spark进行数据处理和数据转换手把手带你玩转Spark机器学习-使用Spark构建分类模型手把手带你玩转Spa…

Spark Machine Learning(SparkML):机器学习(部分一)

机器学习是现阶段实现人工智能应用的主要方法,它广泛应用于机器视觉、语音识别、自然语言处理、数据挖掘等领域。MLlib是Apache Spark的可伸缩机器学习库。官网地址:[http://spark.apache.org/docs/latest/ml-guide.html] Spark的机器学习(ML)库提供了许多分布式ML算法。这些算…

手把手带你玩转Spark机器学习-使用Spark构建聚类模型

系列文章目录 手把手带你玩转Spark机器学习-专栏介绍手把手带你玩转Spark机器学习-问题汇总手把手带你玩转Spark机器学习-Spark的安装及使用手把手带你玩转Spark机器学习-使用Spark进行数据处理和数据转换手把手带你玩转Spark机器学习-使用Spark构建分类模型手把手带你玩转Spa…

Spark机器学习解析

源码加数据集: 文件源码 Gitee好像只收10M一下的文件类型,所以数据集就只能以链接的形式自己下了 KMeans和决策树KDD99数据集,推荐使用10%的数据集: http://kdd.ics.uci.edu/databases/kddcup99/ ALS电影推荐的Movielens数据集…

Spark大数据处理系列之Machine Learning

Spark的机器学习库(Spark MLlib),包括各种机器学习算法:协同过滤算法、聚类算法、分类算法和其他算法。在前面的《Spark大数据处理》系列文章,介绍Apache Spark框架,介绍如何使用Spark SQL库的SQL接口去访问数据,使用S…

大数据笔记--Spark机器学习(第一篇)

目录 一、数据挖掘与机器学习 1、概念 2、人工智能 3、数据挖掘体系 二、机器学习 1、什么是机器学习 2、机器学习的应用 3、实现机器学习算法的工具与技术框架 三、Spark MLlib介绍 1、简介 2、MLlib基本数据类型 Ⅰ、概述 Ⅱ、本地向量 Ⅲ、向量标签的使用 Ⅳ…

iis 重启 (三种方法)

分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章。分享知识,造福人民,实现我们中华民族伟大复兴! 1 1、界面操作 打开“控制面板”->“管…

IIS中应用程序池自动停止,重启报错

阅文时长| 0.2分钟字数统计| 329.6字符主要内容| 1、引言&背景 2、解决方案 3、声明与参考资料 『IIS中应用程序池自动停止,重启报错』编写人| SCscHero 编写时间| 2022/1/3 PM12:32文章类型| 系列完成度| 已完成座右铭每一个伟大的事业,都有一个微不…

服务器上系统怎么启动iis,IIS服务器如何重新启动

IIS的设计目的是建立一套集成的服务器服务,用以支持HTTP,FTP和SMTP,它能够提供快速且集成了现有产品,同时可扩展的Internet服务器,而最近就有很多小伙伴问小编IIS服务器如何重新启动,其实很简单&#xff0c…

解决:IIS 假死,运行一段时间服务器上所有网站打不开,必须要重启服务器才行,重启IIS都没用。怎么解决,解决方案

tip:我服务器问题是 每个5-6天后,服务器上的所有网站都不能访问,重启IIS无用,必须重启服务器! 后,请教周华伟周经理后,找到解决方案如下: 第一原因: 这是程序池造成的&#xff0c…

iis服务器 关闭自动启动,设置IIS服务器定时自动重启的方法

最近,有一朋友的IIS服务器老是出现问题,运行一段时间下来就会出现访问服务器上的网站时提示数据库连接出错,然后重启IIS后网站又能正常访问了,实在找不出是什么原因导致了这个问题。不过最终我想到了一个笨办法,就是让…

C#实现对IIS网站和应用程序池实时监测(网站停止后自动重启)

一、需求分析 在我们的日常运维中,可能会遇到业务网站在运行一段时间后由于某些不确定因素而停止运行,导致业务功能受影响,而此时只要我们重启服务又能够正常运行了,在我们还没有完全排查从根本上解决问题前,需要一个临时的方法来救场(即:当发现业务网站停止后能够自动重…

bat脚本重启IIS中的网站

bat脚本实现重启IIS中的网站 echo off %1 mshta vbscript:CreateObject("Shell.Application").ShellExecute("cmd.exe","/c %~s0 ::","","runas",1)(window.close)&&exit cd /d C:\Windows\System32\inetsrv\ tas…

Windows服务器设置IIS定时重启的方法,带图详解

我们在使用Windows2008下IIS服务器时会经常出现资源耗尽的现象,运行一段时间下来就会出现访问服务器上的网站时提示数据库连接出错,重启IIS后网站又能正常访问了,这个问题可能困扰了很多站长朋友。 一位做网站的笔友经过不断的实践找到了一个…

【服务器】iis的重启服务器、重启iis、回收(重启)线程池、刷新网站

windows自带iis,如果没有的也可以控制面板中启用 1、在开始按钮点击右键,选择控制面板。 2、接着我们从控制面板选择“程序”。 3、然后选择“启用或关闭windows功能”。 4、从列表中选择Internet Infomation Services。 5、并且把相应的功能条…