Spark编程实验六:Spark机器学习库MLlib编程

2024-06-29 1198阅读

目录

一、目的与要求

二、实验内容

三、实验步骤

1、数据导入

2、进行主成分分析(PCA)

3、训练分类模型并预测居民收入 

4、超参数调优

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握基本的MLLib编程方法;

2、掌握用MLLib解决一些常见的数据分析问题,包括数据导入、成分分析和分类和预测等。

二、实验内容

1.数据导入

        从文件中导入数据,并转化为DataFrame。

2、进行主成分分析(PCA)

        对6个连续型的数值型变量进行主成分分析。PCA(主成分分析)是通过正交变换把一组相关变量的观测值转化成一组线性无关的变量值,即主成分的一种方法。PCA通过使用主成分把特征向量投影到低维空间,实现对特征向量的降维。请通过setK()方法将主成分数量设置为3,把连续型的特征向量转化成一个3维的主成分。

3、训练分类模型并预测居民收入

        在主成分分析的基础上,采用逻辑斯蒂回归,或者决策树模型预测居民收入是否超过50K;对Test数据集进行验证。

4、超参数调优

        利用CrossValidator确定最优的参数,包括最优主成分PCA的维数、分类器自身的参数等。

附:数据集:

        下载Adult数据集(http://archive.ics.uci.edu/ml/datasets/Adult)。数据从美国1994年人口普查数据库抽取而来,可用来预测居民收入是否超过50K$/year。该数据集类变量为年收入是否超过50k$,属性变量包含年龄、工种、学历、职业、人种等重要信息,值得一提的是,14个属性变量中有7个类别型变量。

Spark编程实验六:Spark机器学习库MLlib编程

Index.txt文件内容: 

Index of adult

02 Dec 1996      140 Index

10 Aug 1996  3974305 adult.data

10 Aug 1996     4267 adult.names

10 Aug 1996  2003153 adult.test

三、实验步骤

1、数据导入

        从文件中导入数据,并转化为DataFrame。

//导入需要的包
from pyspark.ml.feature import PCA 
from pyspark.sql import Row
from pyspark.ml.linalg import Vector,Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.classification import BinaryLogisticRegressionSummary, LogisticRegression
from pyspark.sql import functions
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
//获取训练集测试集(需要对测试集进行一下处理,adult.data.txt的标签是>50K和50K.和>> def f(x):
       rel = {}
       rel['features']=Vectors.dense(float(x[0]),float(x[2]),float(x[4]),float(x[10]),float(x[11]),float(x[12]))
       rel['label'] = str(x[14])
       return rel
>>> df = spark.sparkContext.textFile("file:///usr/local/spark/adult.data.txt").map(lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF() 
df: pyspark.sql.DataFrame = [features: vector, label: string]
>>> test = spark.sparkContext.textFile("file:///usr/local/spark/adult.test.txt").map(lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF()
test: pyspark.sql.DataFrame = [features: vector, label: string]

2、进行主成分分析(PCA)

        对6个连续型的数值型变量进行主成分分析。PCA(主成分分析)是通过正交变换把一组相关变量的观测值转化成一组线性无关的变量值,即主成分的一种方法。PCA通过使用主成分把特征向量投影到低维空间,实现对特征向量的降维。请通过setK()方法将主成分数量设置为3,把连续型的特征向量转化成一个3维的主成分。

        构建PCA模型,并通过训练集进行主成分分解,然后分别应用到训练集和测试集。

>>> pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures").fit(df)
pca: pyspark.ml.feature.PCAModel = PCA_4a668f4a52beccad9526
>>> result = pca.transform(df)
result: pyspark.sql.DataFrame = [features: vector, label: string, pcaFeatures: vector]
>>> testdata = pca.transform(test)
testdata: pyspark.sql.DataFrame = [features: vector, label: string, pcaFeatures: vector] 
  
>>> result.show(truncate=False)
+------------------------------------+------+-----------------------------------------------------------+
|features                            |label |pcaFeatures |
+------------------------------------+------+-----------------------------------------------------------+
|[39.0,77516.0,13.0,2174.0,0.0,40.0] |  featureIndexer = VectorIndexer(inputCol="pcaFeatures", outputCol="indexedFeatures").fit(result)
featureIndexer: pyspark.ml.feature.VectorIndexerModel = VectorIndexer_48bc920d8af88e337d21
>>> print(featureIndexer.numFeatures)
3
  
>>> labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)
labelConverter: pyspark.ml.feature.IndexToString = IndexToString_40e99a67399e57d7950c 
>>> lr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(100)
lr: pyspark.ml.classification.LogisticRegression = LogisticRegression_44efaefad414357b7c36
  
>>> lrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, lr, labelConverter])
lrPipeline: pyspark.ml.Pipeline = Pipeline_49a886038fe4366cb525 
>>> lrPipelineModel = lrPipeline.fit(result)
lrPipelineModel: pyspark.ml.PipelineModel = PipelineModel_43eb8e7d01dae015460c 
>>> lrModel = lrPipelineModel.stages[2]
lrModel:pyspark.ml.classification.LogisticRegressionModel = LogisticRegression_44efaefad414357b7c36
>>> print ("Coefficients: \n " + str(lrModel.coefficientMatrix)+"\nIntercept: "+str(lrModel.interceptVector)+ "\n numClasses: "+str(lrModel.numClasses)+"\n numFeatures: "+str(lrModel.numFeatures))
Coefficients: 
 DenseMatrix([[-1.98285864e-07, -3.50909247e-04, -8.45150628e-04]])
Intercept: [-1.4525982557843347]
 numClasses: 2
 numFeatures: 3
  
>>> lrPredictions = lrPipelineModel.transform(testdata)
lrPredictions: pyspark.sql.DataFrame = DataFrame[features: vector, label: string, pcaFeatures: vector, indexedLabel: double, indexedFeatures: vector, rawPrediction: vector, probability: vector, prediction: double, predictedLabel: string] 
>>> evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
evaluator: pyspark.ml.evaluation.MulticlassClassificationEvaluator = MulticlassClassificationEvaluator_44fb8a00fb8868ae541f 
>>> lrAccuracy = evaluator.evaluate(lrPredictions)
lrAccuracy: Double = 0.7764235163053484
>>> print("Test Error = %g " % (1.0 - lrAccuracy))
Test Error = 0.223576

4、超参数调优

        利用CrossValidator确定最优的参数,包括最优主成分PCA的维数、分类器自身的参数等。

>>> pca = PCA().setInputCol("features").setOutputCol("pcaFeatures")
pca: pyspark.ml.feature.PCA = PCA_465ea3aeee8f823b1cc2
>>> labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
labelIndexer: pyspark.ml.feature.StringIndexerModel = StringIndexer_4a4caa1f671823df2712 
>>> featureIndexer = VectorIndexer().setInputCol("pcaFeatures").setOutputCol("indexedFeatures")
featureIndexer: pyspark.ml.feature.VectorIndexer = VectorIndexer_4a87a808787866220518
>>> labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
labelConverter: pyspark.ml.feature.IndexToString = IndexToString_444190300664cc71e5b5
>>> lr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(100)
lr: pyspark.ml.classification.LogisticRegression = LogisticRegression_4ff3b577b810fd21ab1b
>>> lrPipeline = Pipeline().setStages([pca, labelIndexer, featureIndexer, lr, labelConverter])
lrPipeline: pyspark.ml.Pipeline = Pipeline_4165a34a906306ee044a
>>> paramGrid = ParamGridBuilder().addGrid(pca.k, [1,2,3,4,5,6]).addGrid(lr.elasticNetParam, [0.2,0.8]).addGrid(lr.regParam, [0.01, 0.1, 0.5]).build()
paramGrid: Array[pyspark.ml.param.ParamMap] =
{Param(parent=u'LogisticRegression_4ff3b577b810fd21ab1b', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.2, Param(parent=u'LogisticRegression_4ff3b577b810fd21ab1b', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent=u'PCA_465ea3aeee8f823b1cc2', name='k', doc='the number of principal components'): 1}
{Param(parent=u'LogisticRegression_4ff3b577b810fd21ab1b', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.2, Param(parent=u'LogisticRegression_4ff3b577b810fd21ab1b', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent=u'PCA_465ea3aeee8f823b1cc2', name='k', doc='the number of principal components'): 2}
{Param(parent=u'LogisticRegression_4ff3b577b810fd21ab1b', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.2, Param(parent=u'LogisticRegression_4ff3b577b810fd21ab1b', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent=u'PCA_465ea3ae……
>>> cv = CrossValidator().setEstimator(lrPipeline).setEvaluator(MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")).setEstimatorParamMaps(paramGrid).setNumFolds(3)
cv: pyspark.ml.tuning.CrossValidator = CrossValidator_4d4eaeb04035ccae91e2
>>> cvModel = cv.fit(df)
cvModel: pyspark.ml.tuning.CrossValidatorModel = CrossValidatorModel_4601a7d61debbfd3544e
>>> lrPredictions=cvModel.transform(test)
lrPredictions: pyspark.sql.DataFrame = [features: vector, label: string, pcaFeatures: vector, indexedLabel: double, indexedFeatures: vector, rawPrediction: vector, probability: vector, prediction: double, predictedLabel: string] 
>>> evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
evaluator: pyspark.ml.evaluation.MulticlassClassificationEvaluator = MulticlassClassificationEvaluator_40bfa39a6a73931437c8
>>> lrAccuracy = evaluator.evaluate(lrPredictions)
lrAccuracy: Double = 0.7833268290041506
>>> print("准确率为"+str(lrAccuracy))
准确率为0.7833268290041506
>>> bestModel= cvModel.bestModel
bestModel: pyspark.ml.PipelineModel = PipelineModel_47388ab70ca452562894
>>> lrModel = bestModel.stages[3]
lrModel: pyspark.ml.classification.LogisticRegressionModel = LogisticRegression_46d894d2cea1ed552ec5
>>> print ("Coefficients: \n " + str(lrModel.coefficientMatrix)+"\nIntercept: "+str(lrModel.interceptVector)+ "\n numClasses: "+str(lrModel.numClasses)+"\n numFeatures: "+str(lrModel.numFeatures))
Coefficients: 
 DenseMatrix([[-1.50035172e-07, -1.68933655e-04, -8.83869475e-04,
               4.92262006e-02,  3.10992712e-02, -2.81742804e-01]])
Intercept: [-7.459195847829245]
 numClasses: 2
 numFeatures: 6
>>> pcaModel = bestModel.stages[0]
pcaModel: pyspark.ml.feature.PCAModel = PCA_423c88604bc4e9c371f3
>>> print("Primary Component: " + str(pcaModel.pc))
Primary Component: -9.905077142269292E-6   -1.435140700776355E-4   ... (6 total)
0.9999999987209459      3.0433787125958012E-5   ...
-1.0528384042028638E-6  -4.2722845240104086E-5  ...
3.036788110999389E-5    -0.9999984834627625     ...
-3.9138987702868906E-5  0.0017298954619051868   ...
-2.1955537150508903E-6  -1.3109584368381985E-4  ...

可以看出,PCA最优的维数是6。

四、结果分析与实验体会

        MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作 MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的流水线(Pipeline)API。通过对 Spark 机器学习库 MLlib 的编程实验,我体会到了以下几个方面的丰富之处:

  1. 广泛的算法覆盖: MLlib 提供了各种机器学习算法的实现,包括线性回归、逻辑回归、决策树、随机森林、梯度提升树、支持向量机、朴素贝叶斯、聚类算法(如K-means和层次聚类)、推荐系统(如协同过滤和基于矩阵分解的方法)等。这使得我们可以选择最适合特定任务的算法进行建模和预测。

  2. 大规模数据处理: 基于 Spark 引擎,MLlib 可以处理大规模数据集,利用分布式计算能力进行高效的机器学习任务。分布式数据处理和计算可以加速训练过程,使其适用于处理海量数据的场景。

  3. DataFrame API: MLlib 使用 Spark 的 DataFrame API 进行数据处理和特征工程,这个 API 提供了丰富的函数和转换操作,使得数据清洗、特征提取和转换等流程更加简洁和可扩展。

  4. 模型持久化与加载: MLlib 支持将训练好的模型保存到磁盘,并且可以方便地加载模型进行预测和推理。这样,在实际应用中,可以将模型部署到生产环境中,进行实时的数据处理和预测。

  5. 参数调优工具: MLlib 提供了交叉验证和参数网格搜索等调参工具,帮助我们优化模型的超参数选择,提高模型的性能和泛化能力。

        通过深入学习和实践 MLlib,我们可以更好地理解和应用各种机器学习算法,掌握大规模数据处理和分布式计算的技巧,为解决实际问题提供强大的工具和框架。MLlib 的丰富性使得我们能够灵活选择和组合不同的算法和技术,以满足不同场景下的需求,并构建出高效、准确的机器学习模型。

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]