Spark Word Count Example Using Hadoop as File Store

package com.apporchid.energinet

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

import com.typesafe.config.ConfigFactory

object WordCount {
	def main(args : Array[ String ]) {
		// For ConfigFactory, create application.properties file in src/main/resources folder.
		// Add following line: dev.deploymentMode = local
		val appConf = ConfigFactory.load()
		val conf = new SparkConf().
			setAppName("Word Count").
			setMaster(appConf.getConfig("dev").getString("deploymentMode"))

		val sc = new SparkContext(conf)
		val inputPath = args(0) //hdfs://localhost:9000/user/root/streamData/testing
		val outputPath = args(1) // hdfs://localhost:9000/user/root/output
		println(inputPath)
		val hadoopConfig = new Configuration()
		hadoopConfig.addResource(new Path("C:\\hadoop-2.6.3\\etc\\hadoop\\core-site.xml"))
		hadoopConfig.set("fs.defaultFS", "hdfs://localhost:9000")
		val fs = FileSystem.get(hadoopConfig);
		val inputPathExists = fs.exists(new Path(inputPath))
		val outputPathExists = fs.exists(new Path(outputPath))

		if (!inputPathExists) {
			println("Invalid input path")
			return
		}
		else {
			println(fs.getHomeDirectory.toString())
		}

		if (outputPathExists)
			fs.delete(new Path(outputPath), true)

		// Read the input file using sc.textfile and convert it into rdd[String],
		// then split each line by space using flatmap,
		// then store each word in map as key and value as 1(count),
		// then use reduceByKey to get count of each word. 
		val wc = sc.textFile(inputPath).
			flatMap(rec => rec.split(" ")).
			map(rec => (rec, 1)).
			reduceByKey((acc, value) => acc + value)
		wc.collect().foreach(System.out.println)
		wc.saveAsTextFile(outputPath)
	}
}

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.