Spark Streaming, Kafka and Cassandra Tutorial

This is tutorial builds on our basic “Getting Started with Instaclustr Spark and Cassandra” tutorial to demonstrate how to set up Apache Kafka and use it send data to Spark Streaming where it is summarised before being saved in Cassandra. 

The high-level steps to be followed are:

  1. Set up your environment.
  2. Build the sample.
  3. Run the sample.

1. Set Up Your Environment

To set up your environment, first follow the step in sections 1 (Provision a Cluster with Cassandra + Spark) and 2 (Set up a Spark client) in the Getting Started tutorial here: https://support.instaclustr.com/hc/en-us/articles/213097877-Getting-Started-with-Instaclustr-Spark-Cassandra-

Once this is complete, install and start Kafka:

  1. Download kafka:
cd ~
wget http://www-us.apache.org/dist/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz
  1. Unpack the files:
tar xzf kafka_2.10-0.9.0.1.tgz
  1. Build kafka:
cd kafka_2.10-0.9.0.1
sbt update
sbt package
  1. Start kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties&
bin/kafka-server-start.sh config/server.properties&
  1. Run kafka producer test harness to send some test messages:
bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
  1. With the test harness running, type some random messages followed by Ctrl-D to finish. You will see a lot of logs when you type the first entry (related to the test harness setting up the channel). From then, you should see no errors for subsequent entries.
  2. Run the consumer test harness to retrieve the messages:
bin/kafka-console-consumer.sh --topic test --zookeeper localhost:2181 --from-beginning
  1. You should see the messages you typed in earlier played back to you.

2. Build the sample

We have loaded a sample project including the build, source and configuration files to Github. To build this:

  1. Clone the repository:
cd ~
git clone https://github.com/instaclustr/sample-KafkaSparkCassandra.git
  1. Build the project:
cd sample-KafkaSparkCassandra
sbt assembly
  1. Set your local configuration settings by either overwriting the cassandra-count.conf with the one you created in the previous tutorial or editing the template from the repository to replace the values in <> brackets.

The repository contains 4 active files:

  • sbt: the project file that specifies dependencies.
  • cassandra-count.conf: configuration file with IPs, username, password.
  • src/main/scala/KafkaSparkCassandra.scala: the scala file with the actual application. The code is heavily commented to explain what is going on.
  • project/assembly.sbt: sbt plugin config to package dependencies in the target jar.

When executed, the application will:

  1. Connect directly from the Spark driver to Cassandra, create a keyspace and table to store results if required.
  2. Start a Spark streaming session connected to Kafka. Summarise messages received in each 5 second period by counting words. Save the summary result in Cassandra.
  3. Stop the streaming session after 30 seconds.
  4. Use Spark SQL to connect to Cassandra and extract the summary results table data that has been saved.

3. Run the sample

At this stage, Kafka should still be running after the first step. We need to run both the Kafka producer test harness and the Spark sample app at the same time so it’s easiest if you have two console windows open. Once you have the two windows open and logged in do the following steps:

  1. In your first console window, start the kafka producer test harness:
cd ~/kafka_2.10-0.9.0.1/
bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
  1. In the second console window, submit your Spark job:
cd ~/sample-KafkaSparkCassandra/
~/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --properties-file cassandra-count.conf --class KafkaSparkCassandra target/scala-2.10/cassandra-kafka-streaming-assembly-1.0.jar
  1. Switch back to the kafka producer console window and enter some test messages for 20 seconds or so.
  2. Switch back to the spark console window, amidst the streams of log messages you should see something like the following which is the summary from a single Spark streaming batch:
-------------------------------------------
Time: 1447649930000 ms
-------------------------------------------
(word1,1447649930162,1)
(word2,1447649930162,3)
(word3,1447649930234,9)
  1. After 30 seconds of streaming has passed, you should see an output like the following which is the dump of the Casssandra table:
+-------+--------------------+-----+
|   word|                  ts|count|
+-------+--------------------+-----+
|  word1|2015-11-16 04:58:...|    4|
|  word1|2015-11-16 04:58:...|    5|
|  word1|2015-11-16 04:58:...|    9|
|  word2|2015-11-16 04:58:...|    2|
|  word3|2015-11-16 04:58:...|    1|
|  word3|2015-11-16 04:58:...|    1|
+-------+--------------------+-----+
Last updated:
If you have questions regarding this article, feel free to add it to the comments below.

0 Comments

Please sign in to leave a comment.