
DTs for churn prediction
DTs are commonly considered a supervised learning technique used for solving classification and regression tasks.
More technically, each branch in a DT represents a possible decision, occurrence, or reaction, in terms of statistical probability. Compared to naive Bayes, DTs are a far more robust classification technique. The reason is that at first, the DT splits the features into training and test sets. Then, it produces a good generalization to infer the predicted labels or classes. Most interestingly, a DT algorithm can handle both binary and multiclass classification problems.
For instance, in the following example figure, DTs learn from the admission data to approximate a sine curve with a set of if...else decision rules. The dataset contains the record of each student who applied for admission, say, to an American university. Each record contains the graduate record exam score, CGPA score, and the rank of the column. Now we will have to predict who is competent based on these three features (variables). DTs can be used to solve this kind of problem after training the DT model and pruning unwanted branches of the tree. In general, a deeper tree signifies more complex decision rules and a better-fitted model:

Therefore, the deeper the tree, the more complex the decision rules and the more fitted the model is. Now let's see some pros and cons of DTs:

Now, that we already know the working principle of DTs, let's start using the Spark-based implementation of DTs. Let's start by importing required packages and libraries:
import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{DecisionTreeClassifier, DecisionTreeClassificationModel}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
Now let's create a Spark session and import implicit:
val spark: SparkSession = SparkSessionCreate.createSession("ChurnPredictionDecisionTrees")
import spark.implicits._
Now, once we have the hyperparameters defined and initialized, the next task is to instantiate a DecisionTreeClassifier estimator, as follows:
val dTree = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setSeed(1234567L)
Now that we have three transformers and an estimator ready, the next task is to chain in a single pipeline—that is, each of them acts as a stage:
val pipeline = new Pipeline()
.setStages(Array(PipelineConstruction.ipindexer,
PipelineConstruction.labelindexer,
PipelineConstruction.assembler,dTree))
Let's define the paramgrid to perform such a grid search over the hyperparameter space. This search is through DT's impurity, max bins, and max depth for the best model. Maximum depth of the tree: depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.
On the other hand, the maximum number of bins is used for separate continuous features and for choosing how to split on features at each node. More bins give higher granularity. In short, we search through decision tree's maxDepth and maxBins parameters for the best model:
var paramGrid = new ParamGridBuilder()
.addGrid(dTree.impurity, "gini" :: "entropy" :: Nil)
.addGrid(dTree.maxBins, 2 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil)
.addGrid(dTree.maxDepth, 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: 30 :: Nil)
.build()
In the preceding code segment, we're creating a progressive paramgrid through sequence format. That means we are creating the grid space with different hyperparameter combinations. This will help us provide the best model that consists of the most optimal hyperparameters.
Let's define a BinaryClassificationEvaluator evaluator to evaluate the model:
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("prediction")
We use a CrossValidator for performing 10-fold cross-validation for best model selection:
val crossval = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(numFolds)
Let's now call the fit method so that the complete predefined pipeline, including all feature preprocessing and the DT classifier, is executed multiple times—each time with a different hyperparameter vector:
val cvModel = crossval.fit(Preprocessing.trainDF)
Now it's time to evaluate the predictive power of the DT model on the test dataset. As a first step, we need to transform the test set with the model pipeline, which will map the features according to the same mechanism we described in the previous feature engineering step:
val predictions = cvModel.transform(Preprocessing.testSet)
prediction.show(10)
>>>

However, seeing the preceding prediction DataFrame, it is really difficult to guess the classification accuracy. In the second step, in the evaluation is the evaluate itself using BinaryClassificationEvaluator, as follows:
val accuracy = evaluator.evaluate(predictions)
println("Classification accuracy: " + accuracy)
>>>
Accuracy: 0.870334928229665
So, we get about 87% of classification accuracy from our binary classification model. Now, similar to SVM and LR, we will observe the area under the precision-recall curve and the area under the ROC curve based on the following RDD containing the raw scores on the test set:
val predictionAndLabels = predictions
.select("prediction", "label")
.rdd.map(x => (x(0).asInstanceOf[Double], x(1)
.asInstanceOf[Double]))
Now the preceding RDD can be used to compute the two previously-mentioned performance metrics:
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
println("Area under the precision-recall curve: " + metrics.areaUnderPR)
println("Area under the receiver operating characteristic (ROC) curve : " + metrics.areaUnderROC)
>>>
Area under the precision-recall curve: 0.7293101942399631
Area under the receiver operating characteristic (ROC) curve: 0.870334928229665
In this case, the evaluation returns 87% accuracy but only 73% precision, which is much better than that of SVM and LR. In the following, we again calculate some more metrics; for example, false and true positive and negative predictions are also useful to evaluate the model's performance:
val lp = predictions.select("label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count()
val wrong = lp.filter(not($"label" === $"prediction")).count()
val ratioWrong = wrong.toDouble / counttotal.toDouble
val ratioCorrect = correct.toDouble / counttotal.toDouble
val truep = lp.filter($"prediction" === 0.0).filter($"label" ===
$"prediction").count() / counttotal.toDouble
val truen = lp.filter($"prediction" === 1.0).filter($"label" ===
$"prediction").count() / counttotal.toDouble
val falsep = lp.filter($"prediction" === 1.0).filter(not($"label" ===
$"prediction")).count() / counttotal.toDouble
val falsen = lp.filter($"prediction" === 0.0).filter(not($"label" ===
$"prediction")).count() / counttotal.toDouble
println("Total Count : " + counttotal)
println("Correct : " + correct)
println("Wrong: " + wrong)
println("Ratio wrong: " + ratioWrong)
println("Ratio correct: " + ratioCorrect)
println("Ratio true positive : " + truep)
println("Ratio false positive : " + falsep)
println("Ratio true negative : " + truen)
println("Ratio false negative : " + falsen)
>>>

Fantastic; we achieved 87% accuracy, but for what factors? Well, it can be debugged to get the decision tree constructed during the classification. But first, let's see at what level we achieved the best model after the cross-validation:
val bestModel = cvModel.bestModel
println("The Best Model and Parameters:n--------------------")
println(bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel].stages(3))
>>>
The Best Model and Parameters:
DecisionTreeClassificationModel (uid=dtc_1fb45416b18b) of depth 5 with 53 nodes.
That means we achieved the best tree model at depth 5 having 53 nodes. Now let's extract those moves (that is, decisions) taken during tree construction by showing the tree. This tree helps us to find the most valuable features in our dataset:
bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel]
.stages(3)
.extractParamMap
val treeModel = bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel]
.stages(3)
.asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:n" + treeModel.toDebugString)
>>>
Learned classification tree model:
If (feature 3 <= 245.2)
If (feature 11 <= 3.0)
If (feature 1 in {1.0})
If (feature 10 <= 2.0)
Predict: 1.0
Else (feature 10 > 2.0)
If (feature 9 <= 12.9)
Predict: 0.0
Else (feature 9 > 12.9)
Predict: 1.0
...
Else (feature 7 > 198.0)
If (feature 2 <= 28.0)
Predict: 1.0
Else (feature 2 > 28.0)
If (feature 0 <= 60.0)
Predict: 0.0
Else (feature 0 > 60.0)
Predict: 1.0
In the preceding output, the toDebugString() function prints the tree's decision nodes and the final prediction comes out at the end leaves. It is also clearly seen that features 11 and 3 are used for decision making; they are the two most important reasons why a customer is likely to churn. But what are those two features? Let's see them:
println("Feature 11:" + Preprocessing.trainDF.filter(PipelineConstruction.featureCols(11)))
println("Feature 3:" + Preprocessing.trainDF.filter(PipelineConstruction.featureCols(3)))
>>>
Feature 11: [total_international_num_calls: double]
Feature 3: [total_day_mins: double]
So the customer service calls and total day minutes are selected by the decision trees, since it provides an automated mechanism for determining the most important features.
Wait! We are not finished yet. Last but not least, we will use an ensemble technique, RF, which is considered a more robust classifier than DTs. Again, let's use the Random Forest implementation from the Apache Spark ML package.