Spark MLlib Linear Regression Example

This article provides a step by step example of using Apache Spark MLlib to do linear regression illustrating some more advanced concepts of using Spark and Cassandra together. The programming environment of this example is Zeppelin and the programming language is Scala. In this article, we assume that Zeppelin and cluster has been set up and provisioned properly as show in our previous tutorials: “Getting started with Instaclustr Spark and Cassandra” and “Zeppelin with Instaclustr Spark & Cassandra Tutorial”. 

1. Prepare Data

(1) Determine features and target

The first thing we need to do is to prepare the data we want to use. But before we can do that, we must determine what data points to use as modeling features and what to use as our target. Our sample data set for this case is a monitoring data set with over 2000 metrics. Using all of these as features is not practical. In this example, we only use three metrics:

service 1: CPU_Percentage (feature 1)

service 2: /var/lib/instaclustr disk-free-percent (feature 2)

service 3: /cassandra/metrics/type=Keyspace/keyspace=instaclustr/name=WriteLatency/max (target)

(2) Transform data

In MLlib, all the features must be put into a special data structure called “Vector”. The vector we need to construct is <feature 1: [CPU Percentage], feature 2: [Disk Free], target/label: [Write Latency]> with one vector for every host/time combination in the data set. The schema of the source Cassandra table (which is called Instametrics.events_raw) is <host, service, time, metric> shown as Table1. Obviously, the two structures are not compatible. So we need to transform the data we get from the Cassandra table to fit the vector structure.

Table 1: Cassandra table in instametrics.events_raw  
 host      service  time      metric 
 host1      service1  time1      [value]    
 host1  service1   time2  [value] 
 host1     service1   time3        [value] 
 ......  ......  ......   ...... 
 host2  service2  time1   [value] 
 host2  service2   time2   [value] 
 host2  service2   time3   [value] 
 ......  ......   ......   ...... 
 host3  service3   time1   [value] 
 host3  service3   time2   [value] 
 host3  serivce3   time3   [value] 
 ......  ......  ......  ......

We need to combine the entries which have service name of service1, service 2 or service 3 and the same values of “host” and “time” to construct the feature vector: <service1, service 2, service 3>. The easy way to do the combination is using join operation in Spark. However, a join operation in Spark can be extremely expensive, especially for datasets of large size, because it requires data shuffling among cluster nodes. So we need to find an efficient way to do join.

As the required feature vector is the combination of values of services, our first step in transforming the data was to save the data of each service into a separated Cassandra table. This is good for repeated use: when you want change the structure of feature vectors, you can pick up the desired service tables and combine them together.

Before starting coding, we need to use cqlsh to create the corresponding schemas and set partition key as (host, time), as we need to join these service tables and joining by partition key is much faster than general join. We can the following code to create the schemas in cqlsh.

CREATE KEYSPACE features WITH REPLICATION={'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USE features;
CREATE TABLE cpuPercent(host text, time timestamp, metric double, primary key ((host,time)));
CREATE TABLE wLatencyMax(host text, time timestamp, metric double, primary key ((host,time)));
CREATE TABLE diskFree(host text, time timestamp, metric double, primary key ((host,time)));

After that, we can use the following code to save data to Cassandra tables. It is worth mentioning that the partition key of Cassandra table “events_raw” is (host, service) and we have anther table called “host” only contains a unique list of Host IDs. In the following, we iterate hosts and use “where” condition to fetch needed data from table “events_raw” by partition key, which is more efficient than fetching the whole table and then using a “filter” operation in Spark.

%dep
z.load(“<full path to the spark-cassandra-cannector-assembly-1.6.0-M1.jar>”)

 

import org.apache.spark.{SparkConf, SparkContext};
import org.apache.spark.SparkContext._;
import org.apache.spark.util.Vector;
import org.joda.time.DateTime;
import com.datastax.spark.connector._;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.regression.LinearRegressionModel;
import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
import import java.io.FileOutputStream;
import org.apache.spark.mllib.linalg.Vectors;
import java.io.ObjectOutputStream;
import import java.io.FileOutputStream;
import java.io.FileInputStream;
import java.io.ObjectInputStream
val hosts = sc.cassandraTable(“instametrics”,”host”).as((r:String) => (r)).collect;
var i=0;
for (i <- 0 until hosts.length) { sc.cassandraTable(“instametrics”,”events_raw”).select(“host”,”time”,”metric” ).where(“host=’”+hosts(i)+”’and service=’<service 1>’”).saveToCassandra(“features”,”cpuPercent”,SomeColumns(“host”,”time”,”metric”));
sc.cassandraTable(“instametrics”,”events_raw”).select(“host”,”time”, ”metric”).where(“host=’”+hosts(i)+”’and service=’<service 2>’”).saveToCassandra(“features”,”wLatencyMax”,SomeColumns(“host”,”time”,”metric”));
sc.cassandraTable(“instametrics”,”events_raw”).select(“host”,”time”, ”metric”).where(“host=’”+hosts(i)+”’and service=’<service 3>’”).saveToCassandra(“features”,”diskFree”,SomeColumns(“host”, ”time”,”metric”)); }

Then we get three feature tables of the following structures:

table 2: service 1              table 3: service 2              table 4: servie 3   
 host1  time1  [value]  host1  time1  [value]  host1   time1  [value]
 host1  time2  [value]  host1  time2  [value]  host1  time2  [value]
 host1  time3  [value]  host1  time3  [value]  host1  time3  [value]
 ......  ......  ......  ......  ......  ......  ......  ......  ......
 host2  time1  [value]  host2  time1  [value]  host2  time1  [value]
 host2  time2  [value]  host2  time2  [value]  host2  time2  [value]
 host2  time3  [value]  host2  time3  [value]  host2  time3  [value]
 ......  ......  ......  ......  ......  ......  ......  ......  ......

Finally we use a method provided by the spark-cassandra-connecter called “joinWithCassandraTable”, which can join two Cassandra tables on partition keys and no shuffling is required.

val data=sc.cassandraTable[(String,DateTime,Double)](“features”,”cpuPercent”)
  .joinWithCassandraTable [(String,DateTime,Double)](“features”,”diskFree”)
  .map{case((h1,t1,s1),(h2,t2,s2))=>(h1,t1,s1,s2)}
  .joinWithCassandraTable[(String,DateTime,Double)](“features”,”wLatencymMax”)
  .map{case((h1,t1,s1,s2),(h3,t3,s3))=>LabeledPoint(s3,Vectors.dense(S1,S2))};          

The end result is a RDD table with the vector structure we need for input into the Spark MLlib regression process:

Table 5: Labeled Feature Vector  
 Service 1 (feature 1)      Service 2 (feature 2)       Service 3 (target/label) 
 h1s1t1  h1s2t1  h1s3t1    
 h1s1t2  h1s2t2   h1s3t2
 h1s1t3  h1s2t3  h1s3t3
 ......  ......  ...... 
 h2s1t1  h2s2t1  h2s3t1
 h2s1t2  h2s2t2  h2s3t2
 h2s1t3  h2s2t3  h2s3t3
 ......  ......   ......

2. Train and Test Model

With the data prepared we can start feeding them to MLib. Linear Regression is supervised machine learning algorithm which consists of training and prediction and we need two datasets for the two procedures. The following code splits the dataset we get from Part 1 into two subsets: training and test.

val splits=data.randomSplit(Array (0.8,0.2));
val training=splits(0).cache;
val test=splits(1).cache;

Finally, we can define the Linear Regression algorithm, train and test the model. MLlib provides settings to adjust the algorithm to the needs, but we will use the default for purpose of this example.

val algorithm = new LinearRegressionWithSGD();
val model = algorithm.run(training);
val prediction = model.predict(test.map(_.features));

3. Analysis Results

After we get the prediction for the testing dataset, we can evaluate the model we build. In this example we use “root mean squared error” (RMSE) to quantify the accuracy of the model.

val predictionAndTarget = prediction.zip(test.map(_.label));
val RMSE = math.sqrt(predictionAndTarget.map{case(p,t)=>math.pow((p-t),2)}.mean());
println(“RMSE: ”,RMSE);

Normally, the smaller the RMSE is, the more accurate the model is. But we also need to be very careful to avoid overfitting.

4. Save and Load Model

The linear regression model consists of weights and intercept. Using following code, we can save the model we build to disk for future use and the model file can be found in the installed folder of Zeppelin.

val fos = new FileOutputStream("myModelPath");
val oos = new ObjectOutputStream(fos);
oos.writeObject(model);
oos.close;

We can load the model from disk using the following code.

val fos = new FileInputStream("myModelPath");
val oos = new ObjectInputStream(fos);
val sameModel = oos.readObject().asInstanceOf[LinearRegressionModel];

 

Last updated:
If you have questions regarding this article, feel free to add it to the comments below.

0 Comments

Please sign in to leave a comment.