Spark Codes
Clustering
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
val data = sc.textFile("data.csv")
val parsedData = data.map(s => Vectors.dense(s.split(',').map(_.toDouble)))
val numClusters = 6
val numIterations = 300
val clusters = KMeans.train(parsedData, numClusters, numIterations)
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
val labeledVectors = clusters.predict(parsedData)
labelVectors.saveAsTextFile
val centers = clusters.clusterCenters
===================================================================
scala> textFile.count() // Number of items in this RDD
res0: Long = 126
scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
|For example, the following command runs this app on a synthetic dataset:
|
| bin/spark-submit --class org.apache.spark.examples.mllib.BinaryClassification \
| examples/target/scala-*/spark-examples-*.jar \
| --algorithm LR --regType L2 --regParam 1.0 \
| data/mllib/sample_binary_classification_data.txt
Spark to MySQL
Get mysql connector
http://dev.mysql.com/downloads/connector/j/
Run Shell
$spark-shell --jars /path-to-mysql-jar/mysql-connector-java-*.*.**-bin.jar
scala> val url="jdbc:mysql://localhost:3306/hadoopdb"
scala> val username = "hduser"
scala> val password = "******"
val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url,username,password) ,
"select first_name,last_name,gender from person limit ?, ?",
1, 5, 2, r => r.getString("last_name") + ", " + r.getString("first_name"))
scala> myRDD.foreach(println)
scala> myRDD.saveAsTextFile("person")
Comments
Post a Comment