Instaclustr Spark with ssl configured Cassandra Cluster

A common setup for Cassandra cluster is to enable client encryption. In order to utilize spark with these clusters additonal steps must be taken when submitting jobs to configure the spark cassandra connector to use SSL. In this guide we will go through these steps and attempt to clarify the configuration properties used.

As a prerequisite to this guide the user should have provisioned and configured a cluster with both Cassandra and Spark. You can find the details on how to do this in the following article. Getting Started with Instaclustr Spark & Cassandra

Download Truststore File

You will need download the Certificates for the cluster from the Connection info page for your cluster. This image shows the download button on the top right hand side.
 
 
In the downloaded zip you will find a Java Key Store file called truststore.jks. This file needs to be included as a resource in the assembled jar in a later step.

Creating and Submitting a Scala Job with SSL Cassandra Connection

In this step of the tutorial we will demonstrate how to build and submit a Scala job. This is useful where you wish to create a job and submit it multiple times.

  1. Log in to your Spark client machine
  2. Create required directories for your project:
mkdir ~/cassandra-count
cd cassandra-count
mkdir -p src/main/scala
mkdir project
mkdir -p src/main/java
mkdir -p src/main/resources
  1. Create a file called build.sbt in the cassandra-count directory with the following contents (note: the blank lines are important):
name := "cassandra-count"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-M1"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0" % "provided"

assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) {
(old) => {
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case x => old(x)
}
}
  1. Create a file called assembly.sbt in the cassandra-count/project directory with the following contents (this will include required dependencies in the output jars):
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
  1. Create a file called cassandra-count.scala in the cassandra-count/src/main/scala directory with the following contents:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
object cassandraCount {
  def main(args: Array[String]) {
    // 1. Create a conf for the spark context
    // In this example, spark master and cassandra nodes info are provided in a separate count.conf file.
    val conf = new SparkConf().setAppName("Counting row of a cassandra table")
    
    // 2. Create a spark context
    val sc = new SparkContext(conf)
 
    // 3. Create an rdd that connect to the cassandra table "schema_keyspaces" of the keyspace "system"
    val rdd = sc.cassandraTable("system", "schema_keyspaces")
 
    // 4. Count the number of row
    val num_row = rdd.count()
    println("\n\n Number of rows in system.schema_keyspaces: " + num_row + "\n\n")
 
    // 5. Stop the spark context.
    sc.stop
  }
}
  1. In order for Spark to connect to Cassandra using SSL an appropriate SSL Context needs to be created on the spark driver and all the executors. This is achieved via providing ssl specific properties to the spark cassandra connector. Using the default factory the path to the truststore file needs to be valid on the driver and executors. This can be restrictive. An alternative is to create a custom connector. Next we are going to create a custom cassandra connection class which treats the trust store path property as resource path rather than a file path. This allows the reading of the trust store from a resource inside the assembled jar. Create a file called CustomCassandraConnectionFactory.java in the cassandra-count/src/main/java directory with the following contents:
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
import com.datastax.spark.connector.cql.CassandraConnectionFactory;
import com.datastax.spark.connector.cql.CassandraConnectorConf;
import com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy;
import com.datastax.spark.connector.cql.MultipleRetryPolicy;
import scala.collection.immutable.HashSet;
import scala.collection.immutable.Set;
import scala.reflect.ClassTag;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.Inet4Address;
import java.security.*;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;

public class CustomCassandraConnectionFactory implements CassandraConnectionFactory {
@Override
public Cluster createCluster(CassandraConnectorConf conf) {
try {
return clusterBuilder (conf).build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Set<String> properties() {
try {
return new HashSet<String>();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private Cluster.Builder clusterBuilder(CassandraConnectorConf conf) throws CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
SocketOptions socketOptions = new SocketOptions();
socketOptions.setConnectTimeoutMillis(conf.connectTimeoutMillis());
socketOptions.setReadTimeoutMillis(conf.readTimeoutMillis());

List<Inet4Address> hosts = new ArrayList<Inet4Address>();
scala.collection.Iterator iter = conf.hosts().toIterator();
while (iter.hasNext()) {
Inet4Address a = (Inet4Address) iter.next();
hosts.add(a);
}

Cluster.Builder builder = Cluster.builder()
.addContactPoints(hosts.toArray(new Inet4Address[0]))
.withPort(conf.port())
.withRetryPolicy(
new MultipleRetryPolicy(conf.queryRetryCount(), conf.queryRetryDelay()))
.withReconnectionPolicy(
new ExponentialReconnectionPolicy(conf.minReconnectionDelayMillis(), conf.maxReconnectionDelayMillis()))
.withLoadBalancingPolicy(
new LocalNodeFirstLoadBalancingPolicy(conf.hosts(), conf.localDC(), true))
.withAuthProvider(conf.authConf().authProvider())
.withSocketOptions(socketOptions)
.withCompression(conf.compression());

if (conf.cassandraSSLConf().enabled()) {
SSLOptions options = createSSLOPtions(conf.cassandraSSLConf());
if (null != options) {
builder = builder.withSSL(options);
} else {
builder = builder.withSSL();
}
}
return builder;
}

SSLOptions createSSLOPtions (CassandraConnectorConf.CassandraSSLConf conf) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException, KeyManagementException {
if (conf.trustStorePath().isEmpty()) {
return null;
}
try (InputStream trustStore = this.getClass().getClassLoader().getResourceAsStream(conf.trustStorePath().get())) {
KeyStore keyStore = KeyStore.getInstance(conf.trustStoreType());
keyStore.load(trustStore, conf.trustStorePassword().isDefined() ? conf.trustStorePassword().get().toCharArray() : null);

TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);

SSLContext context = SSLContext.getInstance(conf.protocol());
context.init(null, tmf.getTrustManagers(), new SecureRandom());

ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);

return JdkSSLOptions.builder()
.withSSLContext(context)
.withCipherSuites((String[]) conf.enabledAlgorithms().toArray(tag)).build();
}
}
}
  1. Copy the trust store file downloaded in the earlier step to the cassandra-count/src/main/resources directory.
  1. Additional Properties are needed to set up the connect for the ssl connection to cassandra
    Property Name Description
    spark.cassandra.connection.ssl.enabled boolean switch in indicate whether the connection to cassandra should use SSL
    spark.cassandra.connection.ssl.trustStore.password The password matching the Trust Store
    spark.cassandra.connection.ssl.trustStore.path/td> The path to the trust store file. With the Custom Factory in this example this is a path to a resource instead.
    spark.cassandra.connection.factory For overriding the behaviour of the default Spark Cassandra Connector. When used it should name of the class that implements CassandraConnectionFactory. Details of this class can be found at the DataStax Spark Cassandra Connector page at gitHub
    Create a file called cassandra-count.conf in the cassandra-count directory (this file contains the configuration that will be used when we submit the job):
spark.master spark://<spark_master_private_IP1>:7077,<spark_master_private_IP2>:7077,<spark_master_private_IP3>:7077
spark.executor.memory 1g
spark.cassandra.connection.host <private ip of cassandra>
spark.cassandra.auth.username iccassandra
spark.cassandra.auth.password <iccassandra password>
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.eventLog.enabled true
spark.eventLog.dir .
spark.cassandra.connection.ssl.enabled true
spark.cassandra.connection.ssl.trustStore.password <trust store password>
spark.cassandra.connection.ssl.trustStore.path truststore.jks
spark.cassandra.connection.factory CustomCassandraConnectionFactory
  1. Build the job (from cassandra-count directory):
sbt assembly
  1. Submit the job (from cassandra-count directory):
~/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --properties-file cassandra-count.conf --class cassandraCount target/scala-2.10/cassandra-count-assembly-1.0.jar
  1. You should see a lot of log messages with the row count message about 15 messages from the end.

Using Spark Shell

Connecting to Cassandra via SSL when using Spark Shell is achieved in the same fashion as Spark Submit. The jar containing the custom connection factory and trust store resource must be added to the list of jar files. The same configuration properties used to set up the context for the SSL connection must also be specified. Below is an example Spark Shell Command

cd ~/spark-1.6.0-bin-hadoop2.6/bin
./spark-shell --master spark://:7077,:7077,:7077 --conf spark.cassandra.connection.host= --conf spark.cassandra.auth.username=iccassandra --conf spark.cassandra.auth.password= --jars ~/spark-cassandra-connector-assembly-1.6.0-M1.jar,$HOME/examples/cassandra-count/target/scala-2.10/cassandra-count-assembly-1.0.jar --conf spark.cassandra.connection.ssl.enabled=true --conf spark.cassandra.connection.ssl.trustStore.password=instaclustr --conf spark.cassandra.connection.ssl.trustStore.path=truststore.jks --conf spark.cassandra.connection.factory=CustomCassandraConnectionFactory --files truststore.jks

Further Resources

You can find the source code used in this guide at this GitHub page.

Last updated:
If you have questions regarding this article, feel free to add it to the comments below.

1 Comments

  • Avatar
    hemalatha1991

    Thanks. It is a really helpful article.

    I set my CustomCassandraConnectionFactory like sparkConf.set("spark.cassandra.connection.factory","CustomCassandraConnectionFactory", but I receive an exception as below which indicates it cannot load my class. Do I need to change any classpath settings to avoid this error?

    ERROR  root: Unable to connect to cassandra
    com.datastax.spark.connector.util.ReflectionUtil$.findGlobalObject(ReflectionUtil.scala:55)
    com.datastax.spark.connector.cql.CassandraConnectionFactory$$anonfun$fromSparkConf$1.apply(CassandraConnectionFactory.scala:115)
    com.datastax.spark.connector.cql.CassandraConnectionFactory$$anonfun$fromSparkConf$1.apply(CassandraConnectionFactory.scala:115)
    scala.Option.map(Option.scala:145)
    com.datastax.spark.connector.cql.CassandraConnectionFactory$.fromSparkConf(CassandraConnectionFactory.scala:115)
    com.datastax.spark.connector.util.ConfigCheck$.checkConfig(ConfigCheck.scala:42)
    com.datastax.spark.connector.cql.CassandraConnectorConf$.apply(CassandraConnectorConf.scala:256)
    com.datastax.spark.connector.cql.CassandraConnector$.apply(CassandraConnector.scala:188)
    com.datastax.spark.connector.SparkContextFunctions.cassandraTable$default$3(SparkContextFunctions.scala:52)
    main.Streaming$.createContext(Streaming.scala:198)
    main.Streaming$$anonfun$3.apply(Streaming.scala:113)
    main.Streaming$$anonfun$3.apply(Streaming.scala:113)
    scala.Option.getOrElse(Option.scala:120)
    org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:844)
    main.Streaming$.main(Streaming.scala:113)
    main.Streaming.main(Streaming.scala)
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.lang.reflect.Method.invoke(Method.java:497)
    org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)

     

Please sign in to leave a comment.