Using Spark to Sample Data from One Cassandra Cluster and Write to Another

This tutorial describes how you can use Apache Spark and Zeppelin as part of an Instaclustr-managed cluster to extract and sample data from one cluster and write to another cluster.

1. Prerequisites

(1) At least two clusters running in Instaclustr. In this tutorial, the cluster from which we read data is called “source cluster” and the cluster to which we write the data is called “target cluster”.

(2) The target cluster is provisioned with Zeppelin and Spark.

(3) The schema of the target table must be identical to that of the source table (table names can be different).

2. Configure Network Access

As the spark in your target cluster need to connect to your source cluster to read data, the public IP address of nodes in your target cluster needs to be added into the “Cassandra Allowed Addresses” of your source cluster. The detail steps are as follows:

  • Open your source cluster dashboard page.
  • Click “Settings” panel.
  • Add the public IP addresses of your target cluster nodes to “Cassandra Allowed Addresses”
  • Click “Save Cluster Settings”.

3. Sample and Load Data

(1) Open dashboard page of your target cluster.

(2) Open “Details” panel and click “Zeppelin” button, then you will see Zeppelin webpage opened through your web browser.

(3) Create a new notebook by clicking “Notebook” button in the home page of Zeppelin.

(4) Put the following code in the first paragraph to load dependencies.

%dep
z.load("/opt/zeppelin/interpreter/spark/spark-cassandra-connector-assembly-1.6.0-M1.jar")

(5) Use the following spark code in the next paragraph to sample

%spark
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
val sourceCluster = CassandraConnector(
sc.getConf.set("spark.cassandra.connection.host", "<public IP of nodes in source cluster>")
.set("spark.cassandra.auth.username","<user name of source cluster>")               .set("spark.cassandra.auth.password","<password of source cluster>"))

val rddFromSourceCluster = {
  implicit val c = sourceCluster // connect to source cluster in this code block.
sc.cassandraTable("source","sourcetable")
    .select("key”)
    .sample(false,0.1)// sample data from source cluster. // sample 10% data from source table
    .joinWithCassandraTable(“source”,”sourcetable”)
   }
rddFromSourceCluster.saveToCassandra("target","targettable") //save data to local cassandra

For a large dataset, it is very time consuming to extract the whole dataset into spark and then sample data on spark. To make it more efficient, the method used in above example is sampling partition key and joining the sampled partition key with the source table, which avoids pull the complete data set down to spark.

4. SSL Connection

If encryption is enabled in your source cluster. You need to contact our support to load the truststore file of the source cluster to your target cluster. Meanwhile, the spark context should be configured using the following code:

val sourceCluster = CassandraConnector(
       sc.getConf.set("spark.cassandra.connection.host", "52.6.140.142")
                 .set("spark.cassandra.auth.username","<user name of source cluster>")
                 .set("spark.cassandra.auth.password","<password of source cluster>")
                 .set("spark.cassandra.connection.ssl.trustStore.password", "instaclustr")
.set("spark.cassandra.connection.ssl.enabled","true")                 .set("spark.cassandra.connection.ssl.trustStore.type","jks")                .set("spark.cassandra.connection.ssl.trustStore.path","/opt/spark/conf/source_truststore.jks"))
Last updated:
If you have questions regarding this article, feel free to add it to the comments below.

0 Comments

Please sign in to leave a comment.