Spark SQL is used to process structured data. I faced the problem, when I wanted to do operations per partition (connect to a web service etc.), and add fields to the original data, when i read the data from the new Dataframe API.
Consider you have the following code, which does add a column “additional” by transfoming a cloumn “original” with a user defined function:
val df = sqlContext.read.avro(source) val expanededDF = df.withColumn("additional", transformer(df("original"))) .withColumn("brand_id", brandIdUdf(df("attributes")))
We have a web service, which replaces the udf and this webservice has a client, which is instantiated with Spring. We added Spring to our project as usual and now we can get a SpringContext with:
val context = new ClassPathXmlApplicationContext("classpath:sparkSpringContext.xml") val myService = context.getBean("myService").asInstanceOf[myService]
Note: If you can avoid calling a web service in the Spark job, do it (e.g. get the service data to hdfs)!
How to deal with the parallelism?
Now we need to recap how Spark works. Spark uses partitions for parallelism, each partition is executed on a executor (which is mostly a thread on a datanode). As we don`t want to instantiate a SpringContext for each record in our data (which would happen, if we would init our spring context in the UDF), we have to use a function which is executed per partition. On RDDs there is a method called forEachPartition() and a method called mapPartitions().
Like forEach() and map() forEachPartition() is a action and the other one is a transformation (lazy). For this example we add a column to the record, so we have to use mapPartitions(), because we return a different structured RDD. If we just want to do a PUT request to the webservice and don´t update the current RDD (immutable), we could use forEachPartition().
Inside of the mapPartition method we are now on the executor level. So we have an iterator to the local data and can go through it and return a new RDD. When dealing with avro data, we got a RDD[Row] and also return a RDD[Row]. But Row is an interface and immutable, how to expand it?
The answer is you cant, because all your RDDs are by definition immutable and so we need to create a new one. this code does the job.
val df = sqlContext.read.avro(source) val expanededDf = df.mapPartitions(iter => { // executor level val context = new ClassPathXmlApplicationContext("classpath:sparkSpringContext.xml") val myService = context.getBean("myService").asInstanceOf[myService] iter.map(item => { val myNewColumn = myService.getMyNewColum(item.anyRecordSpecificValue) Row.fromSeq(item.toSeq ++ myNewcolumn) }) }, true)
Now our new RDD[Row] is almost ready, only one thing is missing. The schema is missing. As we have created a new Row, we need to add the new schema.
We can use the old schema and expand it with our new column and create a Dataframe of it:
val newSchema = df.schema.add(StructField("additional", StringType, true)) //nullable string val myFancyExpandedDf = df.sqlContext.createDataFrame(expanededDf , newSchema)