I ran into a little problem with my Spark Scala script. Basically I have raw data which I am doing aggregations on and after grouping and counting etc I want to save the output to a specific JSON format.
-
Let's say I have raw data in a dataframe with nested structs inside (I am using only one in this example).
case class Person(Name: String, Gender: String, Age: Int) var p1 = Person("James", "male", 32) var p2 = Person("James", "male", 36) var p3 = Person("Anna", "female", 25) var dm = sqlContext.createDataFrame(Seq(("1",p1),("2",p2),("3",p3))).toDF("Id", "Person")
-
Then I have a list of column names which should be used for grouping. The thing is that the amount of these column names may be different, therefore I want to add these grouping values as a new column of string arrays.
For example, the following grouping parameters have been entered from an outside system (which doesn't have the nested structure):
val cols = Seq("PersonName", "PersonGender")
-
But as in Spark I would have to use full stops to get the child attributes, i.e
Person.Name
, I have to do some manual mapping of the names. I have a helper function for that:def matchColHeaders(name: String) : String = name match { case "PersonName" => "Person.Name" case "PersonGender" => "Person.Gender" case "PersonAge" => "Person.Age" case _ => name } var groupByList: Seq[org.apache.spark.sql.Column] = cols.map(p => col(matchColHeaders(p)).as(p)) groupByList: Seq[org.apache.spark.sql.Column] = List(Person.Name AS PersonName#76, Person.Gender AS PersonGender#77)
-
I am currently using initial grouping parameters as aliases for clarification. Mostly because there may rise a situation where the raw data has different types of structs, which may also contain attributes called "Name" and when I just use the helper function to map the initial string Seq to a new string Seq, not column Seq, then using
Person.Name
as a grouping parameter would leaveName
as the resulting column name. And this way the result may contain identical column names which would make selecting by these columns throw an exception. -
Then I use this list in the
groupBy
function and add an alias to the column name:val ncColName = "NameCount" val grouped = dm.groupBy(groupByList:_*).agg(count("Id").as(ncColName))
-
This currently results in a dataframe with 3 columns:
+----------+------------+---------+ |PersonName|PersonGender|NameCount| +----------+------------+---------+ | Anna| female| 1| | James| male| 2| +----------+------------+---------+
-
But as the amount of grouping parametes may differ, I want to add them all in a string array (because when I want to use case classes later, I cannot add attributes to a case class dynamically):
import org.apache.spark.sql.functions._ val gvColName = "GroupedValues" val groupedArrayDF = grouped.withColumn(gvColName, array((cols.map(c => col(c))):_*))
-
Now when I create a case class with the structure I want for the resulting dataframe (which I want to write as a JSON), mapping each row to that class object using the variables of the column names (
ncColName
andgvColName
), I get an "Task not serializable" exception:case class Result(GroupValues: Seq[String], Count: Long) val result = groupedArrayDF.map(row => Result(row.getAs(gvColName), row.getAs(ncColName))).toDF org.apache.spark.SparkException: Task not serializable ... Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: Person.Name AS PersonName#54)
This exception is thrown in the combination of using column names with aliases + using variables for the
getAs
function. Doesn't matter if I use string variables for column names or int variables for indices, still fails. -
When I hardcode the column names or indices, then everything works, but I want to use variables, not hardcode string values in my code:
scala> val result = groupedArrayDF.map(row => Result(row.getAs("GroupedValues"), row.getAs("NameCount"))).toDF result: org.apache.spark.sql.DataFrame = [GroupValues: array<string>, Count: bigint]
And when I use
Seq("Person.Name", "Person.Gender")
, then everything also works with the variables for column names or indices. But then I will run into the problem explained in point 4. -
And actually the final schema should be an array of these
Result
class objects. I still haven't figured out, how to do this as well. The expected result should have a schema like that:case class Test(var FilteredStatistics: Array[Result]) val t = Test(Array(Result(Seq("Anna", "female"), 10L), Result(Seq("James", "male"), 22L))) val t2 = sc.parallelize(Seq(t)).toDF scala> t2.printSchema root |-- FilteredStatistics: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- GroupValues: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- Count: long (nullable = false)
TL;DR:
-
How to map dataframe rows to a case class object when dataframe columns have aliases and variables are used for column names?
-
How to add these case class objects to an array?
Aucun commentaire:
Enregistrer un commentaire