Getting Started with Spark and Elassandra with Instaclustr

Elassandra (Elasticsearch + Cassandra) is a fork of Elasticsearch modified to run on top of Apache Cassandra to provide advanced search features on Cassandra tables. In this tutorial we will walk you through the basic steps of setting up an Instaclustr Elassandra cluster with Spark on Amazon Web Services (AWS) and how to write and query Elassandra from Spark. The high-level steps are:

  1. Provision a cluster with Elassandra and Spark
  2. Set up a Spark client to communicate with Elassandra via the Elasticsearch REST API
  3. Configure network access
  4. Run basic queries to read data from Elassandra using Spark Shell
  5. Submit a Spark job to write to Elassandra index

This tutorial assumes that you are familiar with launching and connecting to servers in AWS. While this tutorial uses AWS, we also support Spark on Azure, IBM SoftLayer, and Google Cloud Platform. You can follow a similar approach to set up on those platforms or contact support@instaclustr.com if you need more detailed instructions.

1. Provision a cluster with Elassandra and Spark

a) If you haven’t already signed up for an Instaclustr account, refer our support article to sign up and create an account.

b) Once you have signed up for Instaclustr and verified your email, log in to the Instaclustr console and click the Create Cassandra Cluster button.

creating_cluster_01_final.png

c) On the Create Cassandra Cluster page, enter an appropriate name and network address block for your cluster. Refer our support article on Network Address Allocation to understand how we divide up the specified network range to determine the node IP addresses. Under Applications section, select:

  • Elassandra 2.4.2.13 (Cassandra 3.0.10) (preview)
  • Apache Spark as an Add-on

applications_section.png

d) Under Data Centre section, select:

  • Amazon Web Services as the Infrastructure Provider
  • A minimum node size of m4l-250

data_centre_section.png

e) Leave the other options as default. Accept the terms and conditions and click Create Cluster button.

summary.png

The cluster will automatically provision and will be available for use once all nodes are in the running state.

 

2. Set Up a Spark Client

To use your newly created Spark cluster, you will need to set up a client machine to submit jobs. Use the following steps to set up a client in AWS:

a) Provision a new AWS server with the following configuration:

  • Region: same as your newly created Elassandra and Spark cluster
  • VPC: if possible, use a VPC with DNS resolution and DNS hostname enabled (Otherwise, refer step g below). The VPC network range should not overlap with the network range of your instaclustr cluster.
  • AMI: Ubuntu Server 14.04 LTS (HVM), SSD Volume Type as the AMI
  • Size: t2.small or t2.micro is sufficient for this tutorial and sufficient for many use-cases ongoing

b) ssh to the newly launched server with ubuntu as username.

c) Download the spark version which matches the cluster created in Step 1. In this case, Spark 1.6.0:

wget https://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz

d) Extract the Spark files:

tar -xvf spark-1.6.0-bin-hadoop2.6.tgz

e) Download the Spark Elassandra assembly Jar (this is a fat Jar built by Instaclustr to include all required dependencies, to be used for spark shell). The latest version available for your spark version should be accessible via the Connection Info page of Instaclustr console.

wget https://static.instaclustr.com/spark/spark-elassandra-connector-assembly-1.6.2.jar

f) Install the Java Development Kit:

sudo apt-get update
sudo apt-get install default-jdk

g) If you are not using a VPC with DNS resolution and DNS hostname enabled, you will need to change the hostname of the client to the IP so that it resolves when used by Spark (a bit of a hack – the right way is to edit /etc/hosts but this is quicker):

sudo hostname spark_client_private_ip 

h) Install curl to run some test REST calls in command line:

sudo apt-get install curl

i) To build the final Java example, install maven

sudo apt-get install maven

 

3. Configure Client Network Access

 As Spark has minimal security, we recommend that you access Spark from a peered VPC in AWS to increase the security of network-based access rules. To set up the peered VPC and allow connections from your VPC to the cluster, follow our support article on Using VPC Peering AWS.

Note: When following the VPC Peering instructions, you must add your VPC network range to the Spark Allowed Addresses and the Cassandra Allowed Addresses. The Spark driver on your client machine needs to be able to connect to Cassandra as well as the Spark workers (to establish partition ranges).

To add your VPC network range to Cassandra and Spark allowed addresses:

a) On the Cluster Overview page, click Cluster Settings from Manage Cluster menu of your cluster.

cluster_setting_menu_pix.png

b) On the Settings page, add your VPC network range to Cassandra Allowed Addresses, Elassandra REST API Allowed Addresses, Spark Masters Allowed Addresses and Spark Jobserver Allowed Addresses. After adding VPC network address range, click Save Cluster Settings button.

firewall_rules_cassandra_elassandra.png

 

firewall_rules_spark.png

In addition to connections from the Spark Client to the cluster, the architecture of Spark means that the Spark Cluster needs to be able to connect to the clients. Enable this in AWS by editing the security group associated with your Spark Client to add an Inbound rule with the following values:

  • Type: Custom TCP Rule
  • Protocol: TCP
  • Port Range: 1024-65535
  • Source: Custom IP, <your cluster network range> (viewable from the cluster details page in the Instaclustr console)

inbound_rules_pix.png

 

4. Open Elassandra REST Port in the Cluster

Elasticsearch by defaults listens to REST API calls on port 9200 and uses 9300 for internal communication between nodes. Elassandra uses the same ports. However, these ports are not open in the cluster for security reasons. Instead, the client machine can talk to Elassandra on 9201 port which is secured by a CA-signed certificate in the cluster. To allow the client to to connect to Elassandra’s REST port (9201) in the cluster:

  • log into the Instaclustr console and click on the Settings tab inside your cluster panel
  • In Elassandra REST API Allowed Addresses box, add the global IP address of your client.
  • Finally, save the changes (Refer steps 3a and 3b above)

Having these settings in place allows the client to make REST calls to Elassandra on port 9201.

 

5. Run a Cluster Health Check

Before running any Spark job either via Spark Shell or Spark Submit command, it is worth checking if the cluster has been properly set up and configured. You can do this by running some basic curl commands to index and read twitter like information (demo from Elasticsearch).

Note: Before running the commands, find the required authentication and URL information from Connection Info page in the Console and set them in the commands accordingly. To find authentication and URL information:

a) On the Cluster Overview page, click Connection Details from Manage Cluster menu of your cluster.

connection_details_pix.png

b) On the Connection Info page, you can view Elassandra REST API username, password, port number and URL.

elassandra_uname_passwd_url_pix.png

Once you have the authentication and URL information, log in to your Spark Client and run the following commands to create a twitter user and two tweets in Elassandra. (Replace keywords in <> with corresponding authentication, IP and URL information)

curl -XPUT 'https://<elassandra_username>:<elassandra_password>@<elassandra_REST_API_URL>:9201/twitter/user/kimchy' -d '{ "name" : "Shay Banon" }'
curl -XPUT 'https://<elassandra_username>:<elassandra_password>@<elassandra_REST_API_URL>:9201/twitter/tweet/1' -d '
{
"postDate": "2009-11-15T13:12:00",
"message": "Trying out Elassandra as an Instaclustr managed service, so far so good?"
}'
curl -XPUT 'https://<elassandra_username>:<elassandra_password>@<elassandra_REST_API_URL>:9201/twitter/tweet/2' -d '
{
"postDate": "2009-11-15T14:12:12",
"message": "Another tweet, will it be indexed?"
}'

Running the commands above should be followed by messages like the one below that indicates successful creation of a twitter index.

{"_index":"twitter","_type":"user","_id":"3","_version":1,"_shards":{"total":1,"successful":1,"failed":0},"created":true}

 

6. Run basic queries against Elassandra from Spark Shell

We will now connect to the Spark cluster using the Spark Shell and run some queries.

a) Note the IP addresses of the three Spark Masters in your cluster – this is viewable on the Spark tab on the Instaclustr console for your cluster.

spark_addresses_pix.png

b) Log in to your Spark Client and run the following commands:

$ cd ~/spark-1.6.0-bin-hadoop2.6/bin
$ ./spark-shell --master spark://<spark_master_IP1>:7077,<spark_master_IP2>:7077,<spark_master_IP3>:7077 
--conf spark.es.nodes=<elassandra_Public_IP_Address>
--conf spark.es.port=9202
--conf spark.es.net.ssl.cert.allow.self.signed=true
--conf spark.es.net.ssl=true
--conf spark.es.net.http.auth.user=<elassandra_username>
--conf spark.es.net.http.auth.pass=<elassandra_password>
--jars ~/spark-elassandra-connector-assembly-1.6.2.jar

c) Spark-shell should start without any errors. There will be a lot of log messages. Once fully started you will see a "scala>" prompt as shown below:

spark_shell_pix.png

d) Some imports are necessary. For our simple job, enter the following at the prompt:

import org.elasticsearch.spark._

e) Now run the following code to read the Elassandra index and create a spark RDD. By default the documents are returned as a Tuple2 with id as first element and the actual value as second element.

val tweet_rdd = sc.esRDD("twitter/tweet")
tweet_rdd.take(5).foreach(println)

f) You should now see the following output:

(1,Map(postDate -> Sun Nov 15 13:12:00 UTC 2009, message -> Trying out Elassandra, so far so good?, user -> kimchy))
(2,Map(postDate -> Sun Nov 15 14:12:12 UTC 2009, message -> Another tweet, will it be indexed?, user -> kimchy))

 

7. Submit a Spark job to write to Elassandra index

In this step of the tutorial we will demonstrate how to build and submit a Spark job written in Java to write to Elassandra index.

a) Create the following folder structure on the client.

  • Spark_job/src/main/java/SimpleApp.java
  • Spark_job/pom.xml

b) Add the following code to the maven pom file (pom.xml):

<project>
<groupId>com.instaclustr</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
</project>

c) Create a file called SimpleApp.java in java directory and add the code below. Replace keywords in <> with the corresponding cluster configs which are visible in the Connection Info page on the console (Refer steps 5a, 5b and 6a above):

import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;

public class SimpleApp {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("spark://<Spark_Private_IP>:7077,<Spark_private_IP>:7077");
conf.set("es.nodes",”<elassandra_Public_IP_Address>”);
conf.set("es.port","9202");
conf.set("es.net.ssl","true");
conf.set("es.net.http.auth.user",”<elassandra_username>”);
conf.set("es.net.http.auth.pass",”<elassandra_password>”);
conf.set("es.net.ssl.cert.allow.self.signed","true");
JavaSparkContext jsc = new JavaSparkContext(conf);

JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "twitter/user");
System.out.println("Records Count:" + esRDD.count());

}
}

d) Build the job (from spark_job directory):

mvn package

e) Submit the job (from spark_job directory):

$ ~/spark-1.6.0-bin-hadoop2.6/bin/spark-submit 
--jars ~/spark-elassandra-connector-assembly-1.6.2.jar --class "SimpleApp" target/simple-project-1.0.jar

You should see a lot of log messages with the row count message towards the end.

java_result_new_pix.png

 

8. Conclusion

In this tutorial you have learned how to:

  • Provision a cluster with Elassandra and Spark
  • Setup and configure Spark client
  • Run basic curl commands from Spark client
  • Run basic queries against Elassandra from Spark Shell and
  • Submit a Spark job to write to Elassandra index

For more information, refer following resources:

 

Last updated:
If you have questions regarding this article, feel free to add it to the comments below.

0 Comments

Please sign in to leave a comment.