PySpark allows us to run Python scripts on Apache Spark. For this project, we are going to use input attributes to predict fraudulent credit card transactions. It is estimated that there are around 100 billion transactions per year.

Versions

Application Version
Spark 2.1.0
Python 3.5.2
pip 9.0.1
Jupyter Notebook 4.3.0
py4j 0.10.4

Set constants

CSV_PATH = "/vagrant/data/creditcard.csv"
APP_NAME = "Random Forest Example"
SPARK_URL = "local[*]"
RANDOM_SEED = 13579
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 3
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

Load the data set

The credit card fraud data set has been downloaded from Kaggle. They have tons of data available for free. I’ve saved the data to my local machine at /vagrant/data/creditcard.csv. Be sure to set inferschema = "true" when you load the data.

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName(APP_NAME) \
    .master(SPARK_URL) \
    .getOrCreate()

df = spark.read \
    .options(header = "true", inferschema = "true") \
    .csv(CSV_PATH)

print("Total number of rows: %d" % df.count())
Total number of rows: 284,807

Convert the data frame to a dense vector

Once the CSV data has been loaded, it will be a DataFrame. We need to convert this Data Frame to an RDD of LabeledPoint. Additionally, we need to split the data into a training set and a test set. The training set will be used to create the model. The test set will be used to test the validity of the generated model.

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

transformed_df = df.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[0:-1])))

splits = [TRAINING_DATA_RATIO, 1.0 - TRAINING_DATA_RATIO]
training_data, test_data = transformed_df.randomSplit(splits, RANDOM_SEED)

print("Number of training set rows: %d" % training_data.count())
print("Number of test set rows: %d" % test_data.count())
Number of training set rows: 199,690
Number of test set rows: 85,117

Train the random forest

A random forest is a machine learning classification algorithm. Random forests are generated collections of decision trees. We’re also going to track the time it takes to train our model.

from pyspark.mllib.tree import RandomForest
from time import *

start_time = time()

model = RandomForest.trainClassifier(training_data, numClasses=2, categoricalFeaturesInfo={}, \
    numTrees=RF_NUM_TREES, featureSubsetStrategy="auto", impurity="gini", \
    maxDepth=RF_MAX_DEPTH, maxBins=RF_MAX_BINS, seed=RANDOM_SEED)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)
Time to train model: 7.935 seconds

Make predictions and compute accuracy

Once we’ve trained our random forest model, we need to make predictions and test the accuracy of the model. Fortunately, there is a handy predict() function available. The accuracy is defined as the total number of correct predictions divided by the total number of predictions.

predictions = model.predict(test_data.map(lambda x: x.features))
labels_and_predictions = test_data.map(lambda x: x.label).zip(predictions)
acc = labels_and_predictions.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
print("Model accuracy: %.3f%%" % (acc * 100))
Model accuracy: 99.945%

While 99.945% certainly sounds like a good model, remember there are over 100 billion credit and debit card transactions per year. This means that this model is wrong 55 million times per year. Correcting this balancing and weighting is beyond the scope of this blog post.

Model evaluation

We can also compute Precision/Recall (PR) and Receiver Operating Characteristic (ROC) values for our model.

from pyspark.mllib.evaluation import BinaryClassificationMetrics

start_time = time()

metrics = BinaryClassificationMetrics(labels_and_predictions)
print("Area under Precision/Recall (PR) curve: %.f" % (metrics.areaUnderPR * 100))
print("Area under Receiver Operating Characteristic (ROC) curve: %.3f" % (metrics.areaUnderROC * 100))

end_time = time()
elapsed_time = end_time - start_time
print("Time to evaluate model: %.3f seconds" % elapsed_time)
Area under Precision/Recall (PR) curve: 79
Area under Receiver Operating Characteristic (ROC) curve: 91.648
Time to evaluate model: 10.173 seconds

The code for this blog post is available on Github.