[Apache Spark] Dataframe Expressions

Spark 2.0 data frames offer a very powerful way of accessing structured data in a SQL like way. But sometimes its hard to find the right build-in expression. So I would like to show some things, which is was dealing with. I will use the Dota2 match data, from the other post.

Accessing nested data column-like aka explode

If we have nested structured data, spark does not allow us to access the nested data. Say we have a match record, which contains an array of 10 players and also the information about the winning team. Now we want to aggregate on a column of the players data, while the player itself has no data about the match result, only the team membership. So we need to explode the players array and preserve the match data. In SQL this would create a new row for each player, while duplicating the content around (spark does optimize here).

To do so we can do the following:

    import matchesDf.sqlContext.implicits._ 
    val playerWithWinStatus = matchesDf
      .select($"col.match_id", $"col.radiant_win", explode($"col.players").as("players"))
      .withColumn("winner", isWinner($"radiant_win", $"players.player_slot"))

The created data has 10 records per match, containing “winner”-flag, “match_id” and the player record.


Grouping into a list aka collect_list and type casting

Say we want to use the created dataframe and do some aggregation on the player records, grouped by match_id and by team. But what we really want to get is a list of all heros, which attended this game and we want these “hero_ids” as a seq. The magic expression is called “collect_list”. We also use type casting here.

    val winMatchPlayerDf = playerWithWinStatus
      .select($"winner".cast(DoubleType).as("label"), $"match_id".cast(IntegerType), $"players.hero_id".as("hero"))
      .groupBy($"label", $"match_id")

The data has now these columns: “label” (1.0 or 0.0), “match_id” (12345), “herolist” (1.0, 2.0, 3.0..)

UDF (user defined function) to make a Sparse occurrences vector and using additional parameters

Now we have a list of hero_ids for each match and we want to do some machine learning. To do so we need a vector of the heros for each match, but we need only occurrences, where the indices of the heros are the same for each match. Therefore we need a sparse vector, because we have 111 heros and we do not want to save too much zeros. To accomplish that we define a user defined function, wich also takes the parameter to define the length of the vector.

  val seqToVec = udf((seq: Seq[Double], size: Int) => {
    val occurances = seq.groupBy(l => l).map(t => (t._1.toInt, t._2.length.toDouble)).toSeq
    Vectors.sparse(size, occurances)

  val featuresDf = winMatchPlayerDf.withColumn("features", seqToVec($"herolist", lit(heroCount)))

As a result we have a dataframe, which has the columns “label” and “features” and also the “match_id”.

Related Posts

1 comment

[…] If you want to know about the data frame expressions, which are used in the code you can read my post about […]

Leave a reply