spark-scala-tutorial 0

A free tutorial for Apache Spark.

2 years after

Apache Spark Scala Tutorial

Join the chat at

Dean Wampler, Ph.D. Lightbend [email protected] @deanwampler

This tutorial demonstrates how to write and run Apache Spark applications using Scala (with some SQL). You can run the examples and exercises locally on a workstation, on Hadoop (which could also be on your workstation), or both.

This tutorial is mostly about learning Spark, but I teach you a little Scala as we go. If you are more interested in learning just enough Scala for Spark programming, see my new tutorial Just Enough Scala for Spark.

If you are most interested in using Spark with Hadoop, the Hadoop vendors have preconfigured, virtual machine "sandboxes" with Spark included. See their websites for information.

For more advanced Spark training and services from Lightbend, please visit

Setup Instructions

You can work through the examples and exercises on a local workstation, so-called local mode. If you have Hadoop version 2 (YARN based) installation available, including a virtual machine "sandbox" from one of the Hadoop vendors, you can also run most of the examples in that environment. I'll refer to this arrangement as Hadoop mode. Finally, the exercises should be runnable in Mesos and Spark Standalone clusters with minor tweaks.

Let's discuss setup for local mode first.

Setup for Local Mode

Working in local mode makes it easy to edit, test, run, and debug applications quickly. Then, running them in a cluster provides more real-world testing and finally production scalability and resiliency.

We will build and run the examples and exercises using SBT. You'll need to install SBT, if you don't already have it installed. Follow the instructions on the download page. SBT puts itself on your path. However, if you have a custom installation that isn't on your path, define the environment variable SBT_HOME (MacOS, Linux, or Cygwin only).

NOTE for Windows users

You will need to download a utility program called winutils.exe.

If you have a Hadoop installation on your PC, see if %HADOOP_HOME%\bin\winutils.exe already exists.

If not, download it from here:

If you have a Hadoop installation on your PC (just not winutils.exe...), move the downloaded winutils.exe to the %HADOOP_HOME%\bin directory.

If you don't have a Hadoop installation on your PC, create a directory somewhere, e.g., C:\hadoop, and a bin subdirectory, e.g., C:\hadoop\bin. Move winutils.exe to the bin directory.

Finally, make sure that Spark can find it. Do one of the following:

  • Define HADOOP_HOME to point to the parent of bin, e.g., set HADOOP_HOME=C:\hadoop.
  • Define JAVA_OPTS to point to the parent of bin, e.g., set JAVA_OPTS=-Dhadoop.home.dir=C:\hadoop

Define either variable in the same command window that you'll use to run the the examples.

Setup for Hadoop Mode

NOTE: If you are here to learn Spark, you don't need to setup these exercises for Hadoop execution. Come back to these instructions when you're ready to try working with Spark on Hadoop. Also, this "mode" is not as well tested as I would like, so please report bugs or send patch requests!

If you want to run the examples on Hadoop, choose one of the following options.

The Hadoop vendors all provide virtual machine "sandboxes" that you can load into VMWare, VirtualBox, and other virtualization environments. Most of them now bundle Spark. Check the vendor's documentation.

If you have a Hadoop cluster installation or a "vanilla" virtual machine sandbox, verify if Spark is already installed. For example, log into a cluster node, edge node, or the sandbox and try running spark-shell. If it's not found, then assume that Spark is not installed. Your Hadoop vendor's web site should have information on installing and using Spark. In most cases, it will be as simple as downloading an appropriate Spark build from the Spark download page. Select the distribution built for your Hadoop distribution.

Assuming you don't have administration rights, it's sufficient to expand the archive in your home directory on the cluster node or edge node you intend to use, or within the sandbox. Then add the bin directory under the Spark installation directory to your PATH or define the environment variable SPARK_HOME to match the installation directory, not the bin directory.

You'll need to copy this tutorial to the same server or sandbox. Copy the data to HDFS using the following command, which copies the tutorial's data directory to /user/$USdata:

hadoop fs -put data data

If you want to put the data directory somewhere else, you can, but you'll need to always specify that input location when you run the examples in Hadoop.

You'll also need SBT on the server or sandbox to run the examples. Recall that I recommend going through the tutorial on your local workstation first, then move everything to the cluster node or sandbox to try running the examples in Hadoop.

Using SBT for the Examples and Exercises

From now on, except where noted, the instructions apply for both your local workstation and Hadoop setup.

First, change to the root directory for this tutorial.

If you are working through the tutorial on your local machine, run the SBT command-line interface:

Start the SBT interpreter. Here the $ is the prompt for your command-line shell and > is the SBT prompt:

$ sbt

The prompts can be confusing. We'll see a third prompt shortly, for the Scala interpreter, which is scala>. To remind yourself which interpreter you are using, look at the prompt!

If you are trying the tutorial on a Hadoop cluster or edge node, or in a sandbox, change to the tutorial root directory and run the command ./ It will start SBT and load this tutorial. There are options for this script. Use ./ --help to see those options.

Building and Testing

To ensure that the basic environment is working, compile the code and run the tests:

> test

You can also run SBT with any target from your shell, e.g., $ sbt test.

All dependencies are downloaded, the code is compiled, and the tests are executed. This will take a few minutes the first time, especially because of the dependency downloads, and the tests should pass without error. (We've noticed that sometimes a timeout of some kind prevents the tests from completing successfully, but running the tests again works.)

Tests are provided for most, but not all of the examples. The tests run Spark in local mode only, in a single JVM process without using Hadoop.

Running the Examples

Next, let's run one of the examples both locally or with Hadoop to further confirm that everything is working.

Running a Local-mode Example

Start the SBT interpreter, if necessary:

$ sbt

At the sbt prompt, invoke run-main WordCount3.

An alternative is to enter the run command and have SBT ask you which of the available programs to run. They are listed with a number. Find the entry for WordCount3 and enter the corresponding number at the prompt, then hit RETURN. (Unfortunately, they are not listed in alphabetical order.)

Note the output directory listed in the log messages. Use your workstation's file browser or a command window to view the output in the directory, which will be output/kjv-wc3. You should find _SUCCESS and part-00000 files, following Hadoop conventions, where the latter contains the actual data for the "partitions". The _SUCCESS files are empty. They are written when output to the part-NNNNN files is completed, so that other applications watching the directory know it's safe to read the data.

Running a Hadoop Example

Start SBT and invoke the command, run hadoop.HWordCount3. There will be more log messages and it will take longer to run.

The log messages end with a URL where you can view the output in HDFS, using either the hadoop fs shell command or the HDFS file browser that comes with your distribution.

If you are using the hadoop fs command from a login window, ignore everything in the URL up to the output directory. In other words, you will type the following command for this example:

hadoop fs -ls output/kjv-wc3

If you are using your HDFS file browser, the host IP address and port in the full URL are only valid for sandbox virtual machines. On a real cluster, consult your administrator for the host name and port for your Name Node (the HDFS master process).

For example, in a sandbox, if its IP address is, the URL will be Once you've opened this file browser, it will be easy to navigate to the outputs for the rest of the examples we'll run. If you get a 404 error, check with the sandbox documentation for the correct URL to browse HDFS files (and send us a pull request with a fix!).

Either way, the same _SUCCESS and part-00000 files will be found, although the lines in the latter might not be in the same order as for the local run.

Assuming you encountered no problems, everything is working!

NOTE: The normal Hadoop and Spark convention is to never overwrite existing data. However, that would force us to manually delete old data before rerunning programs, an inconvenience while learning. So, to make the tutorial easier, the programs delete existing data first. Don't do this in production unless you know it's okay!

Naming Conventions

We're using a few conventions for the package structure and main class names:

  • FooBarN.scala - The FooBar compiled program for the N^th^ example. With a few exceptions, it can be run locally and in Hadoop. It defaults to local execution.
  • FooBarN-script.scala - The FooBar script for the N^th^ example. It is run using the spark-shell for local mode and cluster execution, or using the console (interactive Spark shell or REPL) that's provided by SBT.
  • hadoop/HFooBarN.scala - A driver program to run FooBarN in Hadoop. These small classes use a Scala library API for managing operating system processes. In this case, they invoke one or more shell scripts in the tutorial's scripts directory, which in turn call the Spark driver program $SPARK_HOME/bin/spark-submit, passing it the correct arguments. We'll explore the details shortly.
  • solns/FooBarNSomeExercise.scala - The solution to the "some exercise" exercise that's described in FooBarN.scala. These programs are also invoked using sbt run.

Otherwise, we don't use package prefixes, but only because they tend to be inconvenient. Most production code should use packages.

Introduction: What Is Spark?

Let's start with an overview of Spark, then discuss how to setup and use this tutorial.

Apache Spark is a distributed computing system written in Scala for distributed data programming.

Spark includes support for event stream processing, as well as more traditional batch-mode applications. There is a SparkSQL module for working with data sets through SQL queries. It integrates the core Spark API with embedded SQL queries with defined schemas. It also offers Hive integration so you can query existing Hive tables, even create and delete them. Finally, it has JSON support, where records written in JSON can be parsed automatically with the schema inferred and RDDs can be written as JSON.

There is also an interactive shell, which is an enhanced version of the Scala REPL (read, eval, print loop shell). SparkSQL adds a SQL-only REPL shell. For completeness, you can also use a custom Python shell that exposes Spark's Python API. A Java API is also supported and R support is under development.

Why Spark?

By 2013, it became increasingly clear that a successor was needed for the venerable Hadoop MapReduce compute engine. MapReduce applications are difficult to write, but more importantly, MapReduce has significant performance limitations and it can't support event-streaming ("real-time") scenarios.

Spark was seen as the best, general-purpose alternative, so all the major Hadoop vendors announced support for it in their distributions.

Spark Clusters

Let's briefly discuss the anatomy of a Spark cluster, adapting this discussion (and diagram) from the Spark documentation. Consider the following diagram:

Each program we'll write is a Driver Program. It uses a SparkContext to communicate with the Cluster Manager, which is an abstraction over Hadoop YARN, Mesos, standalone (static cluster) mode, EC2, and local mode.

The Cluster Manager allocates resources. An Executor JVM process is created on each worker node per client application. It manages local resources, such as the cache (see below) and it runs tasks, which are provided by your program in the form of Java jar files or Python scripts.

Because each application has its own executor process per node, applications can't share data through the Spark Context. External storage has to be used (e.g., the file system, a database, a message queue, etc.)

Resilient, Distributed Datasets

Three RDDs Partitioned Across a Cluster of Four Nodes

The data caching is one of the key reasons that Spark's performance is considerably better than the performance of MapReduce. Spark stores the data for the job in Resilient, Distributed Datasets (RDDs), where a logical data set is partitioned over the cluster.

The user can specify that data in an RDD should be cached in memory for subsequent reuse. In contrast, MapReduce has no such mechanism, so a complex job requiring a sequence of MapReduce jobs will be penalized by a complete flush to disk of intermediate data, followed by a subsequent reloading into memory by the next job.

RDDs support common data operations, such as map, flatmap, filter, fold/reduce, and groupby. RDDs are resilient in the sense that if a "partition" of data is lost on one node, it can be reconstructed from the original source without having to start the whole job over again.

The architecture of RDDs is described in the research paper Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing.


SparkSQL adds a new DataFrame type that wraps RDDs with schema information and the ability to run SQL queries on them. There is an integration with Hive, the original SQL tool for Hadoop, which lets you not only query Hive tables, but run DDL statements too. There is convenient support for reading and writing Parquet files and for reading and writing JSON-based records.

The Spark Version

This tutorial uses Spark 1.6.2.

The following documentation links provide more information about Spark:

The Documentation includes a getting-started guide and overviews. You'll find the Scaladocs API useful for the tutorial.

The Examples and Exercises

Here is a list of the examples, some of which have exercises embedded as comments. In subsequent sections, we'll dive into the details for each one. Note that each name ends with a number, indicating the order in which we'll discuss and try them:

  • Intro1-script: The first example is actually run interactively.
  • WordCount2: The Word Count algorithm: read a corpus of documents, tokenize it into words, and count the occurrences of all the words. A classic, simple algorithm used to learn many Big Data APIs. By default, it uses a file containing the King James Version (KJV) of the Bible. (The data directory has a README that discusses the sources of the data files.)
  • WordCount3: An alternative implementation of Word Count that uses a slightly different approach and also uses a library to handle input command-line arguments, demonstrating some idiomatic (but fairly advanced) Scala code.
  • Matrix4: Demonstrates using explicit parallelism on a simplistic Matrix application.
  • Crawl5a: Simulates a web crawler that builds an index of documents to words, the first step for computing the inverse index used by search engines. The documents "crawled" are sample emails from the Enron email dataset, each of which has been classified already as SPAM or HAM.
  • InvertedIndex5b: Using the crawl data, compute the index of words to documents (emails).
  • NGrams6: Find all N-word ("NGram") occurrences matching a pattern. In this case, the default is the 4-word phrases in the King James Version of the Bible of the form % love % %, where the % are wild cards. In other words, all 4-grams are found with love as the second word. The % are conveniences; the NGram Phrase can also be a regular expression, e.g., % (hat|lov)ed? % % finds all the phrases with love, loved, hate, and hated.
  • Joins7: Spark supports SQL-style joins as shown in this simple example.
  • SparkSQL8: Uses the SQL API to run basic queries over structured data in DataFrames, in this case, the same King James Version (KJV) of the Bible used in the previous tutorial. There is also a
  • SparkSQLFileFormats9: Demonstrates writing and reading Parquet-formatted data, namely the data written in the previous example.
  • hadoop/HiveSQL10: A script that demonstrates interacting with Hive tables (we actually create one) in the Scala REPL! This example is in a hadoop subdirectory, because it uses features that require a Hadoop setup (more details later on).
  • SparkStreaming11: The streaming capability is relatively new and this exercise shows how it works to construct a simple "echo" server. Running it is a little more involved. See below.

Let's now work through these exercises...



Our first exercise demonstrates the useful Spark Shell, which is a customized version of Scala's REPL (read, eval, print, loop). It allows us to work interactively with our algorithms and data.

Local Mode Execution

Actually, for local mode execution, we won't use the spark-shell command provided by Spark. Instead, we've customized the SBT console (Scala interpreter or "REPL") to behave in a similar way. For Hadoop execution, we'll use spark-shell.

We'll copy and paste commands from the file Intro1-script.scala. Open the file in your favorite editor/IDE.

The extensive comments in this file and the subsequent files explain the API calls in detail. You can copy and paste the comments, too.

NOTE: while the file extension is .scala, this file is not compiled with the rest of the code, because it works like a script. The SBT build is configured to not compile files with names that match the pattern *-script.scala.

To run this example, start the SBT console, e.g.,

$ sbt
> console

Now you are at the prompt for the Scala interpreter. Note the three prompts, $ for your command-line shell, > for SBT, and scala> for the Scala interpreter. Confusing, yes, but you'll grow accustom to them.

Continue with the instructions below.

Hadoop Execution

In your sandbox or cluster node, change to the root node of the tutorial and run the following command:


This script calls the actual Spark Shell script, $SPARK_HOME/bin/spark-shell and passes a --jars argument with the jar of the tutorial's compiled code. The script also passes any additional arguments you provide to spark-shell. (Try the --help option to see the full list.)

You'll see a lot of log messages, ending with the Scala REPL prompt scala>.

Continue with the instructions below.

An Interactive Spark Session

Whether you running the REPL in local mode or the spark-shell version in Hadoop, continue with the following steps.

First, there are some commented lines that every Spark program needs, but you don't need to run them now. Both the local Scala REPL configured in the build and the spark-shell variant of the REPL execute these three lines automatically at startup:

// import org.apache.spark.SparkContext
// import org.apache.spark.SparkContext._
// val sc = new SparkContext("local[*]", "Intro (1)")

The SparkContext drives everything else. Why are there two, very similar import statements? The first one imports the SparkContext type so it wouldn't be necessary to use a fully-qualified name in the new SparkContext statement. The second import statement is analogous to a static import in Java, where we make some methods and values visible in the current scope, again without requiring qualification.

When we start the console, there are other Scala expressions evaluated, too, involving the SparkSQL API. We'll come back to those below.

When a SparkContext is constructed, there are several constructors that can be used. The one shown takes a string for the "master" and an arbitrary job name. The master must be one of the following:

  • local: Start the Spark job standalone and use a single thread to run the job.
  • local[k]: Use k threads instead. Should be less than or equal to the number of cores. Use local[*] for all cores.
  • mesos://host:port: Connect to a running, Mesos-managed Spark cluster.
  • spark://host:port: Connect to a running, standalone Spark cluster.
  • yarn-client or yarn-cluster: Connect to a YARN cluster, which we'll use implicitly when we run in Hadoop.

So, actually, the comment shown is only correct for local mode. When you run spark-shell in Hadoop, the actual master argument used is yarn-client.

Next we define a read-only variable input of type RDD by loading the text of the King James Version of the Bible, which has each verse on a line, we then map over the lines converting the text to lower case:

val input = sc.textFile("data/kjvdat.txt").map(line => line.toLowerCase)

The data directory has a README that discusses the files present and where they came from.

Then, we cache the data in memory for faster, repeated retrieval. You shouldn't always do this, as it's wasteful for data that's simply passed through, but when your workflow will repeatedly reread the data, caching provides performance improvements.


Next, we filter the input for just those verses that mention "sin" (recall that the text is now lower case). Then count how many were found, convert the RDD to a Scala collection (in the memory for the driver process JVM). Finally, loop through the first twenty lines of the array, printing each one, then we do it again with RDD itself.

val sins = input.filter(line => line.contains("sin"))
val count = sins.count()         // How many sins?
val array = sins.collect()       // Convert the RDD into a collection (array)
array.take(20) foreach println   // Take the first 20, and print them 1/line.
sins.take(20) foreach println    // ... but we don't have to "collect" first;
                                 // we can just use foreach on the RDD.

Note: in Scala, the () in method calls are actually optional for no-argument methods.

Continuing, you can define functions as values. Here we create a separate filter function that we pass as an argument to the filter method. Previously we used an anonymous function. Note that filterFunc is a value that's a function of type String to Boolean.

val filterFunc: String => Boolean =
    (s:String) => s.contains("god") || s.contains("christ")

The following more concise form is equivalent, due to type inference of the argument's type:

val filterFunc: String => Boolean =
    s => s.contains("god") || s.contains("christ")

Now use the filter to find all the sin verses that also mention God or Christ, then count them. Note that this time, we drop the parentheses after "count". Parentheses can be omitted when methods take no arguments.

val sinsPlusGodOrChrist  = sins filter filterFunc
val countPlusGodOrChrist = sinsPlusGodOrChrist.count

Finally, let's do Word Count, where we load a corpus of documents, tokenize them into words and count the occurrences of all the words.

First, we'll define a helper method to look at the data. We need to import the RDD type:

import org.apache.spark.rdd.RDD

def peek(rdd: RDD[_], n: Int = 10): Unit = {
  println("RDD type signature: "+rdd+"\n")

In the type signature RDD[_], the _ means "any type". In other words, we don't care what records this RDD is holding, because we're just going to call toString on it (indirectly). The second argument n is the number of records to print. It has a default value of 10, which means if the caller doesn't provide this argument, we'll print 10 records.

The peek function prints the type of the RDD by calling toString on it (effectively). Then it takes the first n records, loops through them, and prints each one on a line.

Let's use peek to remind ourselves what the input value is. For this and the next few lines, I'll put in the scala> prompt, followed by the output:

scala> input
res27: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at map at <console>:20
scala> peek(input)
gen|1|1| in the beginning god created the heaven and the earth.~
gen|1|2| and the earth was without form, and void; and darkness was upon the face of the deep. and the spirit of god moved upon the face of the waters.~
gen|1|3| and god said, let there be light: and there was light.~
gen|1|4| and god saw the light, that it was good: and god divided the light from the darkness.~
gen|1|5| and god called the light day, and the darkness he called night. and the evening and the morning were the first day.~
gen|1|6| and god said, let there be a firmament in the midst of the waters, and let it divide the waters from the waters.~
gen|1|7| and god made the firmament, and divided the waters which were under the firmament from the waters which were above the firmament: and it was so.~
gen|1|8| and god called the firmament heaven. and the evening and the morning were the second day.~
gen|1|9| and god said, let the waters under the heaven be gathered together unto one place, and let the dry land appear: and it was so.~
gen|1|10| and god called the dry land earth; and the gathering together of the waters called he seas: and god saw that it was good.~

Note that input is a subtype of RDD called MapPartitionsRDD. and the RDD[String] means the "records" are just strings. (You might see a different name than res27.) You might confirm for yourself that the lines shown by peek(input) match the input data file.

Now, let's split each line into words. We'll treat any run of characters that don't include alphanumeric characters as the "delimiter":

scala> val words = input.flatMap(line => line.split("""[^\p{IsAlphabetic}]+"""))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at flatMap at <console>:22
scala> peek(words)

Does the output make sense to you? The type of the RDD hasn't changed, but the records are now individual words.

Now let's use our friend from SQL GROUPBY, where we use the words as the "keys":

scala> val wordGroups = words.groupBy(word => word)
wordGroups: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[27] at groupBy at <console>:23
scala> peek(wordGroups)
(winefat,CompactBuffer(winefat, winefat))
(honeycomb,CompactBuffer(honeycomb, honeycomb, honeycomb, honeycomb, honeycomb, honeycomb, honeycomb, honeycomb, honeycomb))
(bone,CompactBuffer(bone, bone, bone, bone, bone, bone, bone, bone, bone, bone, bone, bone, bone, bone, bone, bone, bone, bone, bone))
(glorifying,CompactBuffer(glorifying, glorifying, glorifying))
(nobleman,CompactBuffer(nobleman, nobleman, nobleman))
(hodaviah,CompactBuffer(hodaviah, hodaviah, hodaviah))
(hem,CompactBuffer(hem, hem, hem, hem, hem, hem, hem))
(onyx,CompactBuffer(onyx, onyx, onyx, onyx, onyx, onyx, onyx, onyx, onyx, onyx, onyx))
(pigeon,CompactBuffer(pigeon, pigeon))

Note that the records are now two-element Tuples: (String, Iterable[String]), where Iterable is a Scala abstraction for an underlying, sequential collection. We see that these iterables are CompactBuffers, a Spark collection that wraps an array of objects. Note that these buffers just hold repeated occurrences of the corresponding keys. This is wasteful, especially at scala! We'll learn a better way to do this calculation shortly.

Finally, let's compute the size of each CompactBuffer, which completes the calculation of how many occurrences are there for each word:

scala> val wordCounts1 = word_group => (word_group._1, word_group._2.size))
wordCounts1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[28] at map at <console>:24

scala> peek(wordCounts1)

Note that the function passed to map expects a single two-element Tuple argument. We extract the two elements using the _1 and _2 methods. (Tuples index from 1, rather than 0, following historical convention.) The type of wordCounts1 is RDD[(String,Int)].

There is a more concise syntax we can use for the method, which exploits pattern matching to break up the tuple into its constituents, which are then assigned to the value names:

scala> val wordCounts2 ={ case (word, group) => (word, group.size) }
wordCounts2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[28] at map at <console>:24

scala> peek(wordCounts2)

The results are exactly the same.

But there is actually an even easier way. Note that we aren't modifying the keys (the words), so we can use a convenience function mapValues, where only the value part (second tuple element) is passed to the anonymous function and the keys are retained:

scala> val wordCounts3 = wordGroups.mapValues(group => group.size)
wordCounts3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[31] at mapValues at <console>:24

scala> peek(wordCounts3)
// same as before

Finally, let's save the results to the file system:


If you look in the directory output/kjv-wc-groupby, you'll see three files:


The _SUCCESS file is empty. It's a marker used by the Hadoop File I/O libraries (which Spark uses) to signal to waiting processes that the file output has completed. The other two files each hold a partition of the data. In this case, we had two partitions.

We're done, but let's finish by noting that a non-script program should shutdown gracefully by calling sc.stop(). However, we don't need to do so here, because both our configured console environment for local execution and spark-shell do this for us:

// sc.stop()

If you exit the REPL immediately, this will happen implicitly. Still, it's a good practice to always call stop.

The Spark Web Console

When you have a SparkContext running, it provides a web UI with very useful information about how your job is mapped to JVM tasks, metrics about execution, etc.

Before we finish this exercise, open localhost:4040 and browse the UI. You will find this console very useful for learning Spark internals and when debugging problems.

Intro1-script Exercises

There are comments at the end of this file with suggested exercises to learn the API. All the subsequent examples we'll discuss include suggested exercises, too. Solutions for some of them are provided in the src/main/scala/sparktutorial/solns directory.

You can exit the Scala REPL now. Type :quit or use ^d (control-d - which means "end of input" for *NIX systems.).



The classic, simple Word Count algorithm is easy to understand and it's suitable for parallel computation, so it's a good vehicle when first learning a Big Data API.

In Word Count, you read a corpus of documents, tokenize each one into words, and count the occurrences of all the words globally.

WordCount2.scala uses the KJV Bible text again. (Subsequent exercises will add the ability to specify different input sources using command-line arguments.)

This example does not have a Hadoop version, so we'll only run it locally.

Use SBT to run it, using one of the following three methods:

  • Enter the run command and select the number corresponding to the WordCount2 program.
  • Enter run-main WordCount2
  • Enter ex2, a command alias for run-main WordCount2. (The alias is defined in the project's ./build.sbt file.)

Either way, the output is written to output/kjv-wc2 in the local file system. Use a file browser or another terminal window to view the files in this directory. You'll find an empty _SUCCESS file that marks completion and a part-00000 file that contains the data.

As before, here is the text of the script in sections, with code comments removed:

import util.FileUtil
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

We use the Java default package for the compiled exercises, but you would normally include a package ... statement to organize your applications into packages, in the usual Java way.

We import a FileUtil class that we'll use for "housekeeping". Then we use the same two SparkContext imports we discussed previously. This time, they aren't commented; we must specify these imports ourselves in Spark programs.

Even though most of the examples and exercises from now on will be compiled classes, you could still use the Spark Shell to try out most constructs. This is especially useful when experimenting and debugging!

Here is the outline of the rest of the program, demonstrating a pattern we'll use throughout.

object WordCount2 {
  def main(args: Array[String]): Unit = {

    val sc = new SparkContext("local", "Word Count (2)")

    try {
    } finally {
      sc.stop()      // Stop (shut down) the context.

In case the script fails with an exception, putting the SparkContext.stop() inside a finally clause ensures that we'll properly clean up no matter what happens.

The content of the try clause is the following:

val out = "output/kjv-wc2"
FileUtil.rmrf(out)    // Delete old output (if any)

val input = sc.textFile("data/kjvdat.txt").map(line => line.toLowerCase)

val wc = input
  .flatMap(line => line.split("""[^\p{IsAlphabetic}]+"""))
  .map(word => (word, 1))
  .reduceByKey((count1, count2) => count1 + count2)

println(s"Writing output to: $out")

Because Spark follows Hadoop conventions that it won't overwrite existing data, we delete any previous output, if any. Of course, you should only do this in production jobs when you know it's okay!

Next we load and cache the data like we did previously, but this time, it's questionable whether caching is useful, since we will make a single pass through the data. I left this statement here just to remind you of this feature.

Now we setup a pipeline of operations to perform the word count.

First the line is split into words using as the separator any run of characters that isn't alphabetic, e.g., digits, whitespace, and punctuation. (Note: using "\\W+" doesn't work well for non-UTF8 character sets!) This also conveniently removes the trailing ~ characters at the end of each line that exist in the file for some reason. input.flatMap(line => line.split(...)) maps over each line, expanding it into a collection of words, yielding a collection of collections of words. The flat part flattens those nested collections into a single, "flat" collection of words.

The next two lines convert the single word "records" into tuples with the word and a count of 1. In Shark, the first field in a tuple will be used as the default key for joins, group-bys, and the reduceByKey we use next.

The reduceByKey step effectively groups all the tuples together with the same word (the key) and then "reduces" the values using the passed in function. In this case, the two counts are added together. Hence, we get two-element records with unique words and their counts.

Finally, we invoke saveAsTextFile to write the final RDD to the output location.

Note that the input and output locations will be relative to the local file system, when running in local mode, and relative to the user's home directory in HDFS (e.g., /user/$USER), when a program runs in Hadoop.

Spark also follows another Hadoop convention for file I/O; the out path is actually interpreted as a directory name. It will contain the same _SUCCESS and part-00000 files discussed previously. In a real cluster with lots of data and lots of concurrent tasks, there would be many part-NNNNN files.

Quiz: If you look at the (unsorted) data, you'll find a lot of entries where the word is a number. (Try searching the input text file to find them.) Are there really that many numbers in the bible? If not, where did the numbers come from? Look at the original file for clues.

WordCount2 Exercises

At the end of each example source file, you'll find exercises you can try. Solutions for some of them are implemented in the solns package. For example, solns/WordCount2GroupBy.scala solves the "group by" exercise described in WordCount2.scala.



This exercise also implements Word Count, but it uses a slightly simpler approach. It also uses a utility library to support command-line arguments, demonstrating some idiomatic (but fairly advanced) Scala code. We won't worry about the details of this utility code, just how to use it. When we set up the SparkContext, we also use Kryo Serialization, which provides better compression and therefore better utilization of memory and network bandwidth.

You can run this example in both local mode or using Hadoop.

This version also does some data cleansing to improve the results. The sacred text files included in the data directory, such as kjvdat.txt are actually formatted records of the form:


That is, pipe-separated fields with the book of the Bible (e.g., Genesis, but abbreviated "Gen"), the chapter and verse numbers, and then the verse text. We just want to count words in the verses, although including the book names wouldn't change the results significantly. (Now you can figure out the answer to the "quiz" in the previous section...)

Running WordCount3

Using SBT, do one of the following:

  • Enter the run command and select the number corresponding to the WordCount3 program.
  • Enter run-main WordCount3
  • Enter ex3, a command alias for run-main WordCount3.

To run this example in Hadoop, use the hadoop.HWordCount3 program instead. Run it in one of the same ways as for WordCount3:

  • Enter the run command and select the number corresponding to the hadoop.HWordCount3 program.
  • Enter run-main hadoop.HWordCount3
  • Enter hex3, a command alias for run-main hadoop.HWordCount3.

We'll discuss the Hadoop driver in more detail later.

Command line options can be used to override the default settings for input and output locations, among other things. You can specify arguments after the run, run-main ..., ex3, or hex3 commands.

Here is the help message that lists the available options. The "\" characters indicate long lines that are wrapped to fit. Enter the commands on a single line without the "\". Following Unix conventions, [...] indicates optional arguments, and | indicates alternatives:

run-main WordCount3 [ -h | --help] \
  [-i | --in | --inpath input] \
  [-o | --out | --outpath output] \
  [-m | --master master] \
  [-q | --quiet]

Where the options have the following meanings:

-h | --help     Show help and exit.
-i ... input    Read this input source (default: data/kjvdat.txt).
-o ... output   Write to this output location (default: output/kjvdat-wc3).
-m ... master   local, local[k], yarn-client, etc., as discussed previously.
-q | --quiet    Suppress some informational output.

Try running ex3 -h to see the help message.

When running in Hadoop, relative file paths for input our output are interpreted to be relative to /user/$USER in HDFS.

Here is an example that uses the default values for the options:

run-main WordCount3 \
  --inpath data/kjvdat.txt --output output/kjv-wc3 \
  --master local

You can try different variants of local[k] for the master option, but keep k less than the number of cores in your machine or use *.

When you specify an input path for Spark, you can specify bash-style "globs" and even a list of them:

  • data/foo: Just the file foo or if it's a directory, all its files, one level deep (unless the program does some extra handling itself).
  • data/foo*.txt: All files in data whose names start with foo and end with the .txt extension.
  • data/foo*.txt,data2/bar*.dat: A comma-separated list of globs.

Okay, with all the invocation options out of the way, let's walk through the implementation of WordCount3.

WordCount3 Code Walkthrough


We start with import statements:

import util.{CommandLineOptions, FileUtil, TextUtil}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

As before, but with our new CommandLineOptions utilities added.

object WordCount3 {
  def main(args: Array[String]): Unit = {

    val options = CommandLineOptions(

    val argz   = options(args.toList)
    val master = argz("master")
    val quiet  = argz("quiet").toBoolean
    val in     = argz("input-path")
    val out    = argz("output-path")

I won't discuss the implementation of CommandLineOptions.scala except to say that it defines some methods that create instances of an Opt type, one for each of the options we discussed above. The single argument given to some of the methods (e.g., CommandLineOptions.inputPath("data/kjvdat.txt")) specifies the default value for that option.

After parsing the options, we extract some of the values we need.

Next, if we're running in local mode, we delete the old output, if any:

    if (master.startsWith("local")) {
      if (!quiet) println(s" **** Deleting old output (if any), $out:")

Note that this logic is only invoked in local mode, because FileUtil only works locally. We also delete old data from HDFS when running in Hadoop, but deletion is handled through a different mechanism, as we'll see shortly.

Now we create a SparkConf to configure the SparkContext with the desired master setting, application name, and the use of Kryo serialization.

    val name = "Word Count (3)"
    val conf = new SparkConf().
      set("", name).   // To silence Metrics warning.
      set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)

If the data had a custom type, we would want to register it with Kryo, which already handles common types, like String, which is all we use here for "records". For serializing your classes, replace this line:

      set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

with this line:


Actually, it's harmless to leave in the set("spark.serializer", ...), but it's done for you inside registerKryoClasses.

Now we process the input as before, with a few changes...

    try {
      val input = sc.textFile(in)
        .map(line => TextUtil.toText(line)) // also converts to lower case

It starts out much like WordCount2, but it uses a helper method TextUtil.toText to split each line from the religious texts into fields, where the lines are of the form: book|chapter#|verse#|text. The | is the field delimiter. However, if other inputs are used, their text is returned unmodified. As before, the input reference is an RDD.

Note that I omitted a subsequent call to input.cache as in WordCount2, because we are making a single pass through the data.

      val wc2 = input
        .flatMap(line => line.split("""[^\p{IsAlphabetic}]+"""))
        .countByValue()  // Returns a Map[T, Long]

Take input and split on non-alphabetic sequences of character as we did in WordCount2, but rather than map to (word, 1) tuples and use reduceByKey, we simply treat the words as values and call countByValue to count the unique occurrences. Hence, this is a simpler and more efficient approach.

      val wc2b = => s"${key_value._1},${key_value._2}").toSeq
      val wc2 = sc.makeRDD(wc2b, 1)

      if (!quiet) println(s"Writing output to: $out")

    } finally {

The result of countByValue is a Scala Map, not an RDD, so we format the key-value pairs into a sequence of strings in comma-separated value (CSV) format. The we convert this sequence back to an RDD with makeRDD. Finally, we save to the file system as text.

WARNING: Methods like countByValue that return a Scala collection will copy the entire object back to the driver program. This could crash your application with an OutOfMemory exception if the collection is too big!

Running as a Hadoop Job

Let's discuss Hadoop execution in a little more detail.

Recall from the setup instructions that the data must already be in HDFS. The location /user/$USER/data is assumed. If you used a different location, always specify the --input argument when you run the examples.

The driver program, hadoop.HWordCound3, is used to run WordCount3 in Hadoop. It is run with SBT using run-main hadoop.HWordCound3 or alias hex3 command. Try it now.

The output is more verbose and the execution time is longer, due to Hadoop's overhead. The end of the output shows a URL for the Hue UI that's also part of the Sandbox. Open your browser to that URL to look at the data. The content will be very similar to the output of the previous, local run, but it will be formatted differently and the word-count pairs will be in a different (random) order.

For convenient, there is also a bash shell script for this example in the scripts directory, scripts/


dir=$(dirname $0)
$dir/ --class WordCount3 --output "$output" "[email protected]"

It calls a scripts/ script in the same directory, which deletes the old output from HDFS, if any, and calls Spark's $SPARK_HOME/bin/spark-submit to submit the job to YARN. One of the arguments it passes to spark-submit is the jar file containing all the project code. This jar file is built automatically anytime you invoke the SBT run command.

The other examples also have corresponding scripts and driver programs.

Let's return to hadoop.HWordCound3, which is quite small:

package hadoop
import util.Hadoop

object HWordCount3 {
  def main(args: Array[String]): Unit = {
    Hadoop("WordCount3", "output/kjv-wc3", args)

It accepts the same options as WordCount3, although the --master option defaults to yarn-client this time.

It delegates to a helper class util.Hadoop to do the work. It passes as arguments the class name and the output location. The second argument is a hack: the default output path must be specified here, even though the same default value is also encoded in the application. This is because we eventually pass the value to the scripts/ script, which uses it to delete an old output directory, if any.

Here is the Hadoop helper class:

package util
import scala.sys.process._

object Hadoop {
  def apply(className: String, defaultOutpath: String, args: Array[String]): Unit = {
    val user = sys.env.get("USER") match {
      case Some(user) => user
      case None =>
        println("ERROR: USER environment variable isn't defined. Using root!")

    // Did the user specify an output path? Use it instead.
    val predicate = (arg: String) => arg.startsWith("-o") || arg.startsWith("--o")
    val args2 = args.dropWhile(arg => !predicate(arg))
    val outpath = if (args2.size == 0) s"/user/$user/$defaultOutpath"
      else if (args2(1).startsWith("/")) args2(1)
      else s"/user/$user/${args(2)}"

    // We don't need to remove the output argument. A redundant occurrence
    // is harmless.
    val argsString = args.mkString(" ")
    val exitCode = s"scripts/ --class $className --out $outpath $argsString".!
    if (exitCode != 0) sys.exit(exitCode)

It tries to determine the user name and whether or not the user explicitly specified an output argument, which should override the hard-coded value.

Finally, it invokes the scripts/ script we mentioned above, so that we go thorugh Spark's spark-submit script for submitting to the Hadoop YARN cluster.

WordCount3 Exercises


Don't forget the try the exercises at the end of the source file.

For Hadoop execution, you'll need to edit the source code on cluster or edge node or the sandbox. One way is to simply use an editor on the node, i.e., vi or emacs to edit the code. Another approach is to use the secure copy command, scp, to copy edited sources to and from your workstation.

For sandboxes, the best approach is to share this tutorial's root directory between your workstation and the VM Linux instance. This will allow you to edit the code in your workstation environment with the changes immediately available in the VM. See the documentation for your VM runner for details on sharing folders.

For example, in VMWare, the Sharing panel lets you specify workstation directories to share. In the Linux VM, run the following commands as root to mount all shared directories under /home/shares (or use a different location):

mkdir -p /home/shares
mount -t vmhgfs .host:/ /home/shares

Now any shared workstation folders will appear under /home/shares.



An early use for Spark was implementing Machine Learning algorithms. Spark's MLlib of algorithms contains classes for vectors and matrices, which are important for many ML algorithms. This exercise uses a simpler representation of matrices to explore another topic; explicit parallelism.

The sample data is generated internally; there is no input that is read. The output is written to the file system as before.

Here is the run-main command with optional arguments:

run-main Matrix4 [ -h | --help] \
  [-d | --dims NxM] \
  [-o | --out | --outpath output] \
  [-m | --master master] \
  [-q | --quiet]

The one new optin is for specifying the dimensions, where the string NxM is parsed to mean N rows and M columns. The default is 5x10.

Like for WordCount3, there is also a ex4 short cut for run-main Matrix4.

For Hadoop, run hadoop.HMatrix4 using run-main hadoop.HMatrix4, the hex4 alias, our use the bash script scripts/

We won't cover all the code from now on; we'll skip the familiar stuff:

import util.Matrix

object Matrix4 {

  case class Dimensions(m: Int, n: Int)

  def main(args: Array[String]): Unit = {

    val options = CommandLineOptions(...)
    val argz   = options(args.toList)

    val dimsRE = """(\d+)\s*x\s*(\d+)""".r
    val dimensions = argz("dims") match {
      case dimsRE(m, n) => Dimensions(m.toInt, n.toInt)
      case s =>
        println("""Expected matrix dimensions 'NxM', but got this: $s""")

Dimensions is a convenience class for capturing the default or user-specified matrix dimensions. We parse the argument string to extract N and M, then construct a Dimension instance.

    val sc = new SparkContext(...)

    try {
      // Set up a mxn matrix of numbers.
      val matrix = Matrix(dimensions.m, dimensions.n)

      // Average rows of the matrix in parallel:
      val sums_avgs = sc.parallelize(1 to dimensions.m).map { i =>
        // Matrix indices count from 0.
        // "_ + _" is the same as "(count1, count2) => count1 + count2".
        val sum = matrix(i-1) reduce (_ + _)
        (sum, sum/dimensions.n)
      }.collect    // convert to an array

The core of this example is the use of SparkContext.parallelize to process each row in parallel (subject to the available cores on the machine or cluster, of course). In this case, we sum the values in each row and compute the average.

The argument to parallelize is a sequence of "things" where each one will be passed to one of the operations. Here, we just use the literal syntax to construct a sequence of integers from 1 to the number of rows. When the anonymous function is called, one of those row numbers will get assigned to i. We then grab the i-1 row (because of zero indexing) and use the reduce method to sum the column elements. A final tuple with the sum and the average is returned.

      // Make a new sequence of strings with the formatted output, then we'll
      // dump to the output location.
      val outputLines = Vector(          // Scala's Vector, not MLlib's version!
        s"${dimensions.m}x${dimensions.n} Matrix:") ++ {
        case ((sum, avg), index) =>
          f"Row #${index}%2d: Sum = ${sum}%4d, Avg = ${avg}%3d"
      val output = sc.makeRDD(outputLines)  // convert back to an RDD
      if (!quiet) println(s"Writing output to: $out")
    } finally { ... }

The output is formatted as a sequence of strings and converted back to an RDD for output. The expression sums_avgs.zipWithIndex creates a tuple with each sums_avgs value and it's index into the collection. We use that to add the row index to the output.

Try the simple exercises at the end of the source file.



The fifth example is in two-parts. The first part simulates a web crawler that builds an index of documents to words, the first step for computing the inverse index used by search engines, from words to documents. The documents "crawled" are sample emails from the Enron email dataset, each of which has been previously classified already as SPAM or HAM.

Crawl5a supports the same command-line options as WordCount3:

run-main Crawl5a [ -h | --help] \
  [-i | --in | --inpath input] \
  [-o | --out | --outpath output] \
  [-m | --master master] \
  [-q | --quiet]

Run with run-main Crawl5a, or the ex5a alias.

Crawl5a uses a convenient SparkContext method wholeTextFiles, which is given a directory "glob". The default we use is data/enron-spam-ham/*, which expands to data/enron-spam-ham/ham100 and data/enron-spam-ham/spam100. This method returns records of the form (file_name, file_contents), where the file_name is the absolute path to a file found in one of the directories, and file_contents contains its contents, including nested linefeeds. To make it easier to run unit tests, Crawl5a strips off the leading path elements in the file name (not normally recommended) and it removes the embedded linefeeds, so that each final record is on a single line.

Here is an example line from the output :

(0038.2001-08-05.SA_and_HP.spam.txt,  Subject: free foreign currency newsletter ...)

The next step has to parse this data to generate the inverted index.

Note: There is also an older Crawl5aLocal included but no longer used. It works similarly, but for local file systems only.



Using the crawl data just generated, compute the index of words to documents (emails). This is a simple approach to building a data set that could be used by a search engine. Each record will have two fields, a word and a list of tuples of documents where the word occurs and a count of the occurrences in the document.

InvertedIndex5b supports the usual command-line options:

run-main InvertedIndex5b [ -h | --help] \
  [-i | --in | --inpath input] \
  [-o | --out | --outpath output] \
  [-m | --master master] \
  [-q | --quiet]

For Hadoop, run hadoop.HInvertedIndex5b using run-main hadoop.HInvertedIndex5b or the hex5b alias. There is also a bash script scripts/

The code outside the try clause follows the usual pattern, so we'll focus on the contents of the try clause:

try {
  val lineRE = """^\s*\(([^,]+),(.*)\)\s*$""".r
  val input = sc.textFile(argz("input-path")) map {
    case lineRE(name, text) => (name.trim, text.toLowerCase)
    case badLine =>
      Console.err.println("Unexpected line: $badLine")
      ("", "")

We load the "crawl" data, where each line was written by Crawl5a with the following format: (document_id, text) (including the parentheses). Hence, we use a regular expression with "capture groups" to extract the document_id and text.

Note the function passed to map. It has the form:

  case lineRE(name, text) => ...
  case line => ...

There is now explicit argument list like we've used before. This syntax is the literal syntax for a partial function, a mathematical concept for a function that is not defined at all of its inputs. It is implemented with Scala's PartialFunction type.

We have two case match clauses, one for when the regular expression successfully matches and returns the capture groups into variables name and text and the second which will match everything else, assigning the line to the variable badLine. (In fact, this catch-all clause makes the function total, not partial.) The function must return a two-element tuple, so the catch clause simply returns ("","").

Note that the specified or default input-path is a directory with Hadoop-style content, as discussed previously. Spark knows to ignore the "hidden" files.

The embedded comments in the rest of the code explains each step:

if (!quiet)  println(s"Writing output to: $out")

// Split on non-alphabetic sequences of character as before.
// Rather than map to "(word, 1)" tuples, we treat the words by values
// and count the unique occurrences.
  .flatMap {
    // all lines are two-tuples; extract the path and text into variables
    // named "path" and "text".
    case (path, text) =>
      // If we don't trim leading whitespace, the regex split creates
      // an undesired leading "" word!
      text.trim.split("""[^\p{IsAlphabetic}]+""") map (word => (word, path))
  .map {
    // We're going to use the (word, path) tuple as a key for counting
    // all of them that are the same. So, create a new tuple with the
    // pair as the key and an initial count of "1".
    case (word, path) => ((word, path), 1)
  .reduceByKey{    // Count the equal (word, path) pairs, as before
    (count1, count2) => count1 + count2
  .map {           // Rearrange the tuples; word is now the key we want.
    case ((word, path), n) => (word, (path, n))
  .groupByKey      // There is a also a more general groupBy
  // reformat the output; make a string of each group,
  // a sequence, "(path1, n1) (path2, n2), (path3, n3)..."
  .mapValues(iterator => iterator.mkString(", "))
  // mapValues is like the following map, but more efficient, as we skip
  // pattern matching on the key ("word"), etc.
  // .map {
  //   case (word, seq) => (word, seq.mkString(", "))
  // }
} finally {

Each output record has the following form: (word, (doc1, n1), (doc2, n2), ...). For example, the word "ability" appears twice in one email and once in another (both SPAM):


Related Repositories



SnappyData: OLTP + OLAP Database built on Apache Spark ...



Best practices of using Spark for practicing data scientists in the context of a ...



An example of using Avro and Parquet in Spark SQL ...



An AWS Lambda function in Scala reading events from Amazon Kinesis and writing e ...



A curated list of awesome Scala frameworks, libraries and software. ...

Top Contributors

chicagoscala deanwampler bigsnarfdude abatyuk gitter-badger epishkin