1. 原理和理论基础()
2. Spark代码实例:
1)windows 单机
import org.apache.spark.mllib.classification.NaiveBayesimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.{SparkConf, SparkContext}object local_NaiveBayes { System.setProperty("hadoop.dir.home","E:/zhuangji/winutil/") def main(args:Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("NaiveBayes") val sc = new SparkContext(conf) //initiated data and labeled val data = sc.textFile("E:/Java_WS/ScalaDemo/data/sample_naive_bayes_data.txt") val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split( ' ').map(_.toDouble)) ) } // split data val splits=parsedData.randomSplit(Array(0.6,0.4),seed=11L) val training=splits(0) val test=splits(1) //model and calculated precision & accuracy val model=NaiveBayes.train(training,lambda=1.0,modelType="multinomial") val predictionAndLabel=test.map(p=>(model.predict(p.features),p.label)) val accuracy=1.0*predictionAndLabel.filter(x=>x._1==x._2).count()/test.count() //save and load model model.save(sc,"E:/Spark/models/NaiveBayes") val sameModel=NaiveBayesModel.load(sc,"E:/Spark/models/NaiveBayes") }}
2)集群模式
需要打包,然后通过spark-submit 提交到yarn client或者cluster中:
spark-submit --class myNaiveBayes --master yarn ScalaDemo.jar
import org.apache.spark.mllib.classification.{NaiveBayesModel, NaiveBayes}import org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.{SparkConf, SparkContext}object myNaiveBayes { def main(args:Array[String]) { val conf = new SparkConf().setAppName("NaiveBayes") val sc = new SparkContext(conf) //initiated data and labeled val data = sc.textFile("hdfs://nameservice1/user/hive/spark/data/sample_naive_bayes_data.txt") val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split( ' ').map(_.toDouble)) ) } // split data val splits=parsedData.randomSplit(Array(0.6,0.4),seed=11L) val training=splits(0) val test=splits(1) //model and calculated precision & accuracy val model=NaiveBayes.train(training,lambda=1.0,modelType="multinomial") val predictionAndLabel=test.map(p=>(model.predict(p.features),p.label)) val accuracy=1.0*predictionAndLabel.filter(x=>x._1==x._2).count()/test.count() //save and load model model.save(sc,"hdfs://nameservice1/user/hive/spark/NaiveBayes/model") val sameModel=NaiveBayesModel.load(sc,"hdfs://nameservice1/user/hive/spark/NaiveBayes/model") }}
3)pyspark 代码实例
可以直接利用spark-submit提交,但注意无法到集群(cluster模式目前不支持独立集群、 mesos集群以及python应用程序)
spark-submit pyNaiveBayes.py
#-*- coding:utf-8 -*-from pyspark.mllib.classification import NaiveBayes,NaiveBayesModelfrom pyspark.mllib.linalg import Vectorsfrom pyspark.mllib.regression import LabeledPointfrom pyspark import SparkContextif __name__=="__main__": sc=SparkContext(appName="PythonPi") def parseLine(line): parts=line.split(',') label=float(parts[0]) features=Vectors.dense([float(x) for x in parts[1].split(' ')]) return LabeledPoint(label,features) data=sc.textFile("hdfs://nameservice1/user/hive/spark/data/sample_naive_bayes_data.txt").map(parseLine) training,test=data.randomSplit([0.6,0.4],seed=0) model=NaiveBayes.train(training,1.0) predictionAndLabel=test.map(lambda p:(model.predict(p.features),p.label)) accuracy=1.0*predictionAndLabel.filter(lambda(x,v):x==v).count()/test.count() model.save(sc, "hdfs://nameservice1/user/hive/spark/PythonNaiveBayes/model") sameModel = NaiveBayesModel.load(sc, "hdfs://nameservice1/user/hive/spark/PythonNaiveBayes/model")}
3. Python
from sklearn import naive_bayesimport random##拆分训练集和测试集def SplitData(data,M,k,seed): test=[] train=[] random.seed(seed) for line in data: if random.randint(0,M)==k: test.append(''.join(line)) else: train.append(''.join(line)) return train,test##按分割符拆分X,Ydef parseData(data,delimiter1,delimiter2): x=[] y=[] for line in data: parts = line.split(delimiter1) x1 = [float(a) for a in parts[1].split(delimiter2)] y1 = float(parts[0]) ##print x1,y1 x.append(x1) y.append(y1) return x,y##读取数据data=open('e:/java_ws/scalademo/data/sample_naive_bayes_data.txt','r')training,test=SplitData(data,4,2,10)trainingX,trainingY=parseData(training,',',' ')testX,testY=parseData(test,',',' ')##建模model=naive_bayes.GaussianNB()model.fit(trainingX,trainingY)##评估for b in testX: print(model.predict(b),b)
posted on 2016-11-22 11:52 阅读( ...) 评论( ...)