package com.apporchid.energinet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import com.typesafe.config.ConfigFactory
object WordCount {
def main(args : Array[ String ]) {
// For ConfigFactory, create application.properties file in src/main/resources folder.
// Add following line: dev.deploymentMode = local
val appConf = ConfigFactory.load()
val conf = new SparkConf().
setAppName("Word Count").
setMaster(appConf.getConfig("dev").getString("deploymentMode"))
val sc = new SparkContext(conf)
val inputPath = args(0) //hdfs://localhost:9000/user/root/streamData/testing
val outputPath = args(1) // hdfs://localhost:9000/user/root/output
println(inputPath)
val hadoopConfig = new Configuration()
hadoopConfig.addResource(new Path("C:\\hadoop-2.6.3\\etc\\hadoop\\core-site.xml"))
hadoopConfig.set("fs.defaultFS", "hdfs://localhost:9000")
val fs = FileSystem.get(hadoopConfig);
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))
if (!inputPathExists) {
println("Invalid input path")
return
}
else {
println(fs.getHomeDirectory.toString())
}
if (outputPathExists)
fs.delete(new Path(outputPath), true)
// Read the input file using sc.textfile and convert it into rdd[String],
// then split each line by space using flatmap,
// then store each word in map as key and value as 1(count),
// then use reduceByKey to get count of each word.
val wc = sc.textFile(inputPath).
flatMap(rec => rec.split(" ")).
map(rec => (rec, 1)).
reduceByKey((acc, value) => acc + value)
wc.collect().foreach(System.out.println)
wc.saveAsTextFile(outputPath)
}
}