[Apache Spark] Machine Learning from Disaster: Naive Bayes

The naive bayes classification is a probabilistic classifier, which is used to classify a feature vector to a class. It does a probably wrong assumption, that features are statistically independent to each other. Anyway Apache Spark has implemented Naive Bayes and i will show you how to use it for the titanic dataset. The code below is scala code, as scala is very compact you should be aware of the language and also to Apache Spark basics.

Pre-processing the data

First we need to  read the data from csv and implement the data preparation step from the last post.

Reading unstructured data in Spark is very easy, you can read in every file with the textFile($path) function from the SparkContext. As we got structured csv data, we want to read it with the new DataFrame/Dataset API and not as a simple Textfile. To do so we use a 3rd party library from databricks (spark-csv).

    val df = sqlContext.read
      .option("header", "true") // Use first line of all files as header
      .option("inferSchema", "true") // Automatically infer data types

Getting some statistics

We know that Fare and Age columns have null values and we want to fill in the mean value in that case. Spark ML has a Statistics class, which offers this calculations for RDDs of type Vector. So we create dense vectors from Fare and Age for each row.

    val statsDf = df.map { row =>
      Vectors.dense(row.getAs("Fare"), row.getAs("Age"))
    val summary: MultivariateStatisticalSummary = Statistics.colStats(statsDf)
    val meanFare =  summary.mean(0)
    val meanAge =  summary.mean(1)

Run the Code

  1. Download and extract Spark and you may also need hadoop binaries.
  2. Set up HADOOP_HOME and SPARK_HOME, make sure JAVA_HOME is set
  3. Clone the repository
  4. Build the project with maven
  5. Run TitanicBayesTest

Local Spark Context

To get a local Spark context, wich does also work with the local file system, instead of HDFS and one single jvm we set the spark.master to local[4]. The 4 is the number of cores used for this context. We also use the KyroSerializer instead of the default one.

    val conf = new SparkConf().setAppName("Naive_bayes_titanic")
    conf.set("spark.master", "local[4]")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryoserializer.buffer.max", "512m")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

User defined functions

In Spark/Scala things are immutable by default, we always create new Dataframes or RDDs after manipulating the data. In the RDD context we can simply use map to manipulate Data. When working with Dataframes (structured data) we need to use user defined functions. A user defined function can be defined for multiple input columns or just for one. We want to only filter null values, so we only use one.

After defining the UDFs we select the columns (SQL like) and use the udf as a function in the select statement. Alias does name the new column (which replaces the old one in this case).

      val normFare = udf((d: String) => d match {
      case null => Some(meanFare)
      case s => Some(s.toDouble)

    val normAge = udf((d: String) => d match {
      case null => Some(meanAge)
      case s => Some(s.toDouble)

    // select the columns we need and apply the udfs
    val preprocessed = df.select(df("Survived"), normFare(df("Fare")).alias("Fare"), normAge(df("Age")).alias("Age"), df("Pclass"))


To train the bayes model we need LabeledPoints, which represent the annotated data. Each labeled point is a entry in the DataFrame and consists of the label and the feature vector. The label is the class, un out case there are only 2 classes survived and not-survived. If you have more labels use whole numbers to represent them.

The NaiveBayes supports some optional parameters. You can specify the lambda and the method, which is used. Multinomial is what we want to use, the Bernoulli model would be only accept 0 and 1 as vector values (occurrences).

      val mappedDf = df.map(row =>
      (row.getAs[Int]("Survived"), row.getAs[Double]("Fare"), row.getAs[Int]("Pclass"), row.getAs[Double]("Age")))

    val labledData = mappedDf.map { case (survived, fare, pclass, age) =>
      LabeledPoint(survived, Vectors.dense(fare, pclass, age))
    naiveBayesModel = NaiveBayes.train(labledData, lambda = 1.0, modelType = "multinomial")


Now we have trained the model and can predict survival of passengers from the validation/test data set. We read the data from the test set like the training set and also create the feature vectors. Then we use the predict method, which returns the class. In this case we don´t use udfs just to show how you can convert between RDD and Dataframes.


       val resultRDD = df.map { row =>
      val denseVecor = Vectors.dense(row.getAs[Double]("Fare"), row.getAs[Int]("Pclass"), row.getAs[Double]("Age"))
      val result = naiveBayesModel.predict(denseVecor)
      Row.fromTuple((row.getAs[Int]("PassengerId"), result.toInt))
val customSchema = StructType(Array(
      StructField("PassengerId", IntegerType, false),
      StructField("Survived", DoubleType, false))
    val resultDf = sqlContext.createDataFrame(resultRDD, customSchema)

Save results

Now we have the result and want to save it to a csv file for Kaggle submission. To avoid, that we write the csv by our self we converted the RDD to structured data (DataFrame) and can use the databricks csv library.

      //  only one file (one partition)
      .option("header", "true")
      .save("results/out_" + System.currentTimeMillis())

After the submission, we can see the result isn´t that bad, but its also not very good. So what about the gender? Take a look at the github code!

Thanks for reading! Please comment, if you have any thoughts!

June 5, 2016

Related Posts

1 comment

Leave a reply