Spark Codes


import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

val data = sc.textFile("data.csv")

val parsedData = => Vectors.dense(s.split(',').map(_.toDouble)))

val numClusters = 6
val numIterations = 300
val clusters = KMeans.train(parsedData, numClusters, numIterations)

val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)

val labeledVectors = clusters.predict(parsedData)
val centers = clusters.clusterCenters


scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

scala> => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

scala> import java.lang.Math
import java.lang.Math

scala> => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

          |For example, the following command runs this app on a synthetic dataset:
          | bin/spark-submit --class org.apache.spark.examples.mllib.BinaryClassification \
          |  examples/target/scala-*/spark-examples-*.jar \
          |  --algorithm LR --regType L2 --regParam 1.0 \
          |  data/mllib/sample_binary_classification_data.txt

Spark to MySQL

Get mysql connector

Run Shell

$spark-shell --jars /path-to-mysql-jar/mysql-connector-java-*.*.**-bin.jar

scala> val url="jdbc:mysql://localhost:3306/hadoopdb"
scala> val username = "hduser"
scala> val password = "******"

val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url,username,password) ,
"select first_name,last_name,gender from person limit ?, ?",
1, 5, 2, r => r.getString("last_name") + ", " + r.getString("first_name"))

scala> myRDD.foreach(println)

scala> myRDD.saveAsTextFile("person")


