Getting Started with Instaclustr Spark & Cassandra

Our Spark offering is now GA since our release of Spark 1.6.0. If you choose to provision your cluster with Zeppelin, you will be able to quickly write some Spark job via Zeppelin interface, available through Instaclustr console. You will find more details in this article: https://support.instaclustr.com/hc/en-us/articles/214940967

The rest of this tutorial focuses on the other possibility to submit jobs to your spark cluster. More precisely, it will walk you through the basic steps of setting up an Instaclustr Cassandra cluster with Spark on Amazon Web Services and submitting an analytics job to Spark. The high-level steps outlined are:

  1. Provision a cluster with Cassandra + Spark
  2. Set up a Spark client
  3. Configure network access
  4. Basic Interaction with Spark Shell
  5. Using SparkSQL from Spark Shell
  6. Submit a Scala job

This tutorial assumes that you are familiar with launching and connecting to servers in AWS.

While this tutorial is specific to AWS, we also support Spark on Azure and IBM SoftLayer. You can follow a similar approach to set up on those platforms or contact support@instaclustr.com if you need more detailed instructions.

1       Provision a Cluster with Cassandra + Spark

Provision Base Cluster

Once you have signed up for Instaclustr and verified your email, log in to the Instaclustr console and click the “Create Cassandra Cluster” button. On the create cluster screen, choose:

  • Amazon Web Services as the Infrastructure Provider
  • Apache Cassandra 2.1.x / 2.2.x or DataStax Enterprise 4.8.x version of Cassandra
  • Apache Spark as an Add-on
  • A minimum node size of t2.medium
  • Do not enable client encryption (our current Spark release does not support client encryption)
  • Use Private IP Addresses for node discovery

Accept the terms and conditions and click “Create Cluster”. The cluster will automatically provision and is available to start using once all nodes are in the running state.

2       Set Up a Spark Client

To use our Spark cluster, you will need a client machine set up to submit jobs. Use the following steps to set up a client in AWS: 

1. Provision a new AWS server with the following configuration:

  1. Region: same as your newly created Cassandra+Spark cluster
  2. VPC: if possible, use a VPC with DNS resolution and DNS hostname enabled. The VPC network range should not overlap with the network range of your instaclustr cluster.
  3. AMI: Ubuntu Server 14.04 LTS (HVM), SSD Volume Type as the AMI
  4. Size: t2.small is sufficient for this tutorial and sufficient for many use-cases ongoing

2. Log in to the newly launched server with ubuntu as username.
3. Download the spark version matching your instaclustr version. In this case, Spark 1.6.0:

wget https://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz

4. Extract the Spark files:

tar -xvf spark-1.6.0-bin-hadoop2.6.tgz

5. Download the Spark Cassandra assembly Jar (this is a fat Jar built by Instaclustr to include all required dependencies, to be used for spark shell). The latest version available for your spark version should be accessible via the "connection info" page of Instaclustr console.

wget https://static.instaclustr.com/spark/spark-cassandra-connector-assembly-1.6.2.jar

6. Install the Java Development Kit:

sudo apt-get install default-jdk

7. If you are not using a VPC with DNS resolution and DNS hostname enabled, you will need to change the hostname of the client to the IP so that it resolves when used by Spark (a bit of a hack – the right way is to edit /etc/hosts but this is quicker):

sudo hostname <spark client private IP>

8. If you will be building the final scala example, then install sbt:

echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt

3       Configure Network Access

As Spark has minimal security, we recommend that you access Spark from a peered VPC in AWS increase the security of network-based access rules.

To set up the peered VPC and allow connections from your VPC to the cluster, follow the instructions here:

https://support.instaclustr.com/hc/en-us/articles/203559854-Using-VPC-Peering-AWS 

Note: When following the VPC Peering instructions, you must add your VPC network range to the “Spark Allowed Addresses” and the “Cassandra Allowed Addresses”. The Spark driver on your client machine needs to be able to connect to Cassandra as well as the Spark workers (to establish partition ranges). 

As well connections from the Spark Client to the cluster, the architecture of Spark means that the Spark Cluster needs to be able to connect to the clients. Enable this in AWS by editing the security group associated with your Spark Client to add an Inbound rule with the following values:

  • Type: Custom TCP Rule
  • Protocol: TCP
  • Port Range: 1024-65535
  • Source: Custom IP, <your cluster network range> (viewable from the cluster details page in the Instaclustr console)

 

4       Basic interaction with Spark Shell

We will now connect to the Spark cluster using the Spark Shell and run an analytic job. (Note: sometimes the log messages from Spark shell overwrite the shell prompt. If processing seems to have stopped with no prompt then hit the enter key to get a prompt.)

1. Note the IP of the three Spark Masters in your cluster – this is viewable on the Spark tab on the Instaclustr console for your cluster.

2. Log in to your Spark Client and run the following command (adjust keywords in <> to specify your spark master IPs, one of cassandra IP, and the cassandra password if you enabled auth)

cd ~/spark-1.6.0-bin-hadoop2.6/bin
./spark-shell --master spark://<spark_master_IP1>:7077,<spark_master_IP2>:7077,<spark_master_IP3>:7077  --conf spark.cassandra.connection.host=<cassandra_private_IP>   --conf spark.cassandra.auth.username=iccassandra  --conf spark.cassandra.auth.password=<iccassandra password>  --jars ~/spark-cassandra-connector-assembly-1.6.2.jar

3. Spark-shell should start without any errors. There will be a lot of log message. Once fully started you will see a prompt: “scala>”.

4. Some imports are necessary. For this simple job, enter the following at the prompt:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import com.datastax.spark.connector._

5. Now we can create an rdd and execute an action on it. Only the action (rdd.count) will trigger the calculation. In this case, we use the "system" keyspace that is used by cassandra to keep tracks of internals, such as the list of keyspaces

val rdd = sc.cassandraTable("system","schema_keyspaces")
println("Row count: " + rdd.count)

6. You should see a lot of log messages followed by the row count message.

5       Using Spark SQL from Spark Shell

Spark SQL allows you to run complex SQL queries against Cassandra data. The following step demonstrate how to execute a Spark SQL query against Cassandra using the Spark SQL connector. Execute these steps in the same Spark shell session where you executed the previous example:

  1. Import the required libraries:
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql
  1. Create the Cassandra SQL context:
val csc = new CassandraSQLContext(sc)
  1. Run a query and print the results:
val rdd1 = csc.sql("SELECT count(*) from system.schema_keyspaces")
println("Row count: " + rdd1.first()(0))
  1. You can also try a slightly more sophisticated query:
csc.sql("SELECT keyspace_name, count(*) from system.schema_columnfamilies group by keyspace_name").collect().foreach(println)

6       Creating and Submitting a Scala Job

In this step of the tutorial we will demonstrate how to build and submit a Scala job. This is useful where you wish to create a job and submit it multiple times.

  1. Log in to your Spark client machine
  2. Create required directories for your project:
mkdir ~/cassandra-count
cd cassandra-count
mkdir -p src/main/scala
mkdir project
  1. Create a file called build.sbt in the cassandra-count directory with the following contents (note: the blank lines are important):
name := "cassandra-count"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.2"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0" % "provided"

assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) {
(old) => {
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case x => old(x)
}
}
  1. Create a file called assembly.sbt in the cassandra-count/project directory with the following contents (this will include required dependencies in the output jars):
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
  1. Create a file called cassandra-count.scala in the cassandra-count/src/main/scala directory with the following contents:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
object cassandraCount {
  def main(args: Array[String]) {
    // 1. Create a conf for the spark context
    // In this example, spark master and cassandra nodes info are provided in a separate count.conf file.
    val conf = new SparkConf().setAppName("Counting row of a cassandra table")
    
    // 2. Create a spark context
    val sc = new SparkContext(conf)
 
    // 3. Create an rdd that connect to the cassandra table "schema_keyspaces" of the keyspace "system"
    val rdd = sc.cassandraTable("system", "schema_keyspaces")
 
    // 4. Count the number of row
    val num_row = rdd.count()
    println("\n\n Number of rows in system.schema_keyspaces: " + num_row + "\n\n")
 
    // 5. Stop the spark context.
    sc.stop
  }
}
  1. Create a file called cassandra-count.conf in the cassandra-count directory (this file contains the configuration that will be used when we submit the job):
spark.master spark://<spark_master_private_IP1>:7077,<spark_master_private_IP2>:7077,<spark_master_private_IP3>:7077
spark.executor.memory 1g
spark.cassandra.connection.host <private ip of cassandra>
spark.cassandra.auth.username iccassandra
spark.cassandra.auth.password <iccassandra password>
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.eventLog.enabled true
spark.eventLog.dir .
  1. Build the job (from cassandra-count directory):
sbt assembly
  1. Submit the job (from cassandra-count directory):
~/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --properties-file cassandra-count.conf --class cassandraCount target/scala-2.10/cassandra-count-assembly-1.0.jar
  1. You should see a lot of log messages with the row count message about 15 messages from the end.
Last updated:
If you have questions regarding this article, feel free to add it to the comments below.

0 Comments

Please sign in to leave a comment.