mardi 20 décembre 2016

Spark Dataframe schema definition using reflection with case classes and column name aliases

I ran into a little problem with my Spark Scala script. Basically I have raw data which I am doing aggregations on and after grouping and counting etc I want to save the output to a specific JSON format.

  1. Let's say I have raw data in a dataframe with nested structs inside (I am using only one in this example).

    case class Person(Name: String, Gender: String, Age: Int)
    var p1 = Person("James", "male", 32)
    var p2 = Person("James", "male", 36)
    var p3 = Person("Anna", "female", 25)
    var dm = sqlContext.createDataFrame(Seq(("1",p1),("2",p2),("3",p3))).toDF("Id", "Person")
    
    
  2. Then I have a list of column names which should be used for grouping. The thing is that the amount of these column names may be different, therefore I want to add these grouping values as a new column of string arrays.

    For example, the following grouping parameters have been entered from an outside system (which doesn't have the nested structure):

    val cols = Seq("PersonName", "PersonGender")
    
    
  3. But as in Spark I would have to use full stops to get the child attributes, i.e Person.Name, I have to do some manual mapping of the names. I have a helper function for that:

    def matchColHeaders(name: String) : String = name match {
    case "PersonName" => "Person.Name"
    case "PersonGender" => "Person.Gender"
    case "PersonAge" => "Person.Age"
    case _ => name
    }
    
    var groupByList: Seq[org.apache.spark.sql.Column] = cols.map(p => col(matchColHeaders(p)).as(p))
    groupByList: Seq[org.apache.spark.sql.Column] = List(Person.Name AS PersonName#76, Person.Gender AS PersonGender#77)
    
    
  4. I am currently using initial grouping parameters as aliases for clarification. Mostly because there may rise a situation where the raw data has different types of structs, which may also contain attributes called "Name" and when I just use the helper function to map the initial string Seq to a new string Seq, not column Seq, then using Person.Name as a grouping parameter would leave Name as the resulting column name. And this way the result may contain identical column names which would make selecting by these columns throw an exception.

  5. Then I use this list in the groupBy function and add an alias to the column name:

    val ncColName = "NameCount"
    val grouped = dm.groupBy(groupByList:_*).agg(count("Id").as(ncColName))
    
    
  6. This currently results in a dataframe with 3 columns:

    +----------+------------+---------+
    |PersonName|PersonGender|NameCount|
    +----------+------------+---------+
    |      Anna|      female|        1|
    |     James|        male|        2|
    +----------+------------+---------+
    
    
  7. But as the amount of grouping parametes may differ, I want to add them all in a string array (because when I want to use case classes later, I cannot add attributes to a case class dynamically):

    import org.apache.spark.sql.functions._
    val gvColName = "GroupedValues"
    val groupedArrayDF = grouped.withColumn(gvColName, array((cols.map(c => col(c))):_*))
    
    
  8. Now when I create a case class with the structure I want for the resulting dataframe (which I want to write as a JSON), mapping each row to that class object using the variables of the column names (ncColName and gvColName), I get an "Task not serializable" exception:

    case class Result(GroupValues: Seq[String], Count: Long)
    
    val result = groupedArrayDF.map(row => Result(row.getAs(gvColName), row.getAs(ncColName))).toDF
    
    org.apache.spark.SparkException: Task not serializable
    ...
    Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
    Serialization stack:
    - object not serializable (class: org.apache.spark.sql.Column, value: Person.Name AS PersonName#54)
    
    

    This exception is thrown in the combination of using column names with aliases + using variables for the getAs function. Doesn't matter if I use string variables for column names or int variables for indices, still fails.

  9. When I hardcode the column names or indices, then everything works, but I want to use variables, not hardcode string values in my code:

    scala> val result = groupedArrayDF.map(row => Result(row.getAs("GroupedValues"), row.getAs("NameCount"))).toDF
    result: org.apache.spark.sql.DataFrame = [GroupValues: array<string>, Count: bigint]
    
    

    And when I use Seq("Person.Name", "Person.Gender"), then everything also works with the variables for column names or indices. But then I will run into the problem explained in point 4.

  10. And actually the final schema should be an array of these Result class objects. I still haven't figured out, how to do this as well. The expected result should have a schema like that:

    case class Test(var FilteredStatistics: Array[Result])
    val t = Test(Array(Result(Seq("Anna", "female"), 10L), Result(Seq("James", "male"), 22L)))
    
    val t2 = sc.parallelize(Seq(t)).toDF
    
    scala> t2.printSchema
    root
     |-- FilteredStatistics: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- GroupValues: array (nullable = true)
     |    |    |    |-- element: string (containsNull = true)
     |    |    |-- Count: long (nullable = false)
    
    

TL;DR:

  1. How to map dataframe rows to a case class object when dataframe columns have aliases and variables are used for column names?

  2. How to add these case class objects to an array?





Aucun commentaire:

Enregistrer un commentaire