Using Spark to Sample Data from One Cassandra Cluster and Write to Another

This tutorial describes how you can use Apache Spark and Zeppelin as part of an Instaclustr-managed cluster to extract and sample data from one cluster and write to another cluster.

1. Prerequisites

(1) At least two clusters running in Instaclustr. In this tutorial, the cluster from which we read data is called “source cluster” and the cluster to which we write the data is called “target cluster”.

(2) The target cluster is provisioned with Zeppelin and Spark.

(3) The keyspace of the target table must be identical to that of the source table (table names can be different).

2. Configure Network Access

As Spark in your target cluster needs to connect to your source cluster to read data, the public IP addresses of the nodes in your target cluster needs to be added into the “Cassandra Allowed Addresses” of your source cluster. The detailed steps are as follows:

  • Open your source cluster dashboard page.
  • Click “Settings” panel.
  • Add the public IP addresses of your target cluster nodes to “Cassandra Allowed Addresses”
  • Click “Save Cluster Settings”.

3. Create Table Definition on Source Cassandra Cluster and Target Cassandra Cluster

(1) Check the public IP address of your source cluster node.

(2) Open a terminal.

(3) Make sure cqlsh is installed on your system.

(4) Execute:

cqlsh <public IP address of source cluster> 

(5) Change to instaclustr keyspace:

use instaclustr; 

(6) Create a table called “users”:

CREATE TABLE users (
userid text Primary Key,
first_name text,
last_name text,
emails set<text>,
top_scores list<int>,
todo map<timestamp, text>
);

(7)  Insert test data:

INSERT INTO users(userid, first_name, last_name) VALUES (‘1’, ’f_name_src', ’l_name_src’);
INSERT INTO users(userid, first_name, last_name) VALUES (‘2’, ’f_name_src', ’l_name_src’);
INSERT INTO users(userid, first_name, last_name) VALUES (‘3’, ’f_name_src', ’l_name_src’);
INSERT INTO users(userid, first_name, last_name) VALUES (‘4’, ’f_name_src', ’l_name_src’);
INSERT INTO users(userid, first_name, last_name) VALUES (‘5’, ’f_name_src', ’l_name_src’);
INSERT INTO users(userid, first_name, last_name) VALUES (‘6’, ’f_name_src', ’l_name_src’);
INSERT INTO users(userid, first_name, last_name) VALUES (‘7’, ’f_name_src', ’l_name_src’);
INSERT INTO users(userid, first_name, last_name) VALUES (‘8’, ’f_name_src', ’l_name_src’);
INSERT INTO users(userid, first_name, last_name) VALUES (‘9’, ’f_name_src', ’l_name_src’);
INSERT INTO users(userid, first_name, last_name) VALUES (‘10’, ’f_name_src', ’l_name_src’);

(8) Execute “quit” to exit the Cassandra environment.

(9) Check the public IP addresses of your target cluster nodes.

(10) Execute:

cqlsh <public IP address of target cluster>

(11) Change to instaclustr keyspace:

use instaclustr;

(12) Create a table called “users”: 

CREATE TABLE users (
userid text PRIMARY KEY
);

(13) Execute “select” CQL command: 

SELECT * FROM users;

The result should be empty. 

result.png 

4. Sample and Load Data

(1) Open the dashboard page of your target cluster.

(2) Open “Details” panel and click “Zeppelin” button, then you will see the Zeppelin webpage opened through your web browser.

zeppelin.png

(3) Create a new notebook by clicking the “Notebook” button on the home page of Zeppelin.

(4) Put the following code in the first paragraph to load dependencies.

%dep
z.load("/opt/zeppelin/interpreter/spark/spark-cassandra-connector-assembly-2.0.2.jar")

(5) Use the following spark code in the next paragraph to sample

%spark
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
val sourceCluster = CassandraConnector(
sc.getConf.set("spark.cassandra.connection.host", "<public IP of nodes in source cluster>")
.set("spark.cassandra.auth.username","<user name of source cluster>")
.set("spark.cassandra.auth.password","<password of source cluster>"))
val rddFromSourceCluster = {
implicit val c = sourceCluster // connect to source cluster in this code block.
sc.cassandraTable("<source keyspace>","<source table>")
.select("<PK column>")
.sample(false,0.1)// sample data from source cluster. // sample 10% data from source table
}
rddFromSourceCluster.saveToCassandra("<target keyspace>","<target table>") //save data to local cassandra

For a large dataset, it is very time consuming to extract the whole dataset into Spark and then sample data on Spark. To make it more efficient, the method used in the above example is sampling partition key and joining the sampled partition key with the source table, which avoids pulling the complete data set down to Spark.

(6) Check the result on target Cassandra Cluster.

(7) Go back to the terminal environment.

(8) Execute “select” CQL command again:

SELECT * FROM users;

The result should be as following: 

result_2.png

 

5. SSL Connection

If encryption is enabled in your source cluster you will need to contact our support@instaclustr.com to load the truststore file of the source cluster to your target cluster. Meanwhile, the Spark context should be configured using the following code:

val sourceCluster = CassandraConnector(
       sc.getConf.set("spark.cassandra.connection.host", "<Source IP>")
                 .set("spark.cassandra.auth.username","<user name of source cluster>")
                 .set("spark.cassandra.auth.password","<password of source cluster>")
                 .set("spark.cassandra.connection.ssl.trustStore.password", "instaclustr")
.set("spark.cassandra.connection.ssl.enabled","true")                 .set("spark.cassandra.connection.ssl.trustStore.type","jks")                .set("spark.cassandra.connection.ssl.trustStore.path","/opt/spark/conf/source_truststore.jks"))
Last updated:
If you have questions regarding this article, feel free to add it to the comments below.

0 Comments

Please sign in to leave a comment.