I am trying to convert an RDD of objects to DataFrame, with the peculiarity that said objects have lazy values.
To do this without having to specify the schema of each object by hand, I have created a trait that converts that object to JSON format (or any other text format). The code is the following:
trait Formattable {
lazy val fieldsMap: Map[String, Any] = {
val mirror: Mirror = runtimeMirror(this.getClass.getClassLoader)
val classSymbol: ClassSymbol = mirror.classSymbol(this.getClass)
val instanceMirror: InstanceMirror = mirror.reflect(this)
val laziesNames = classSymbol.info.decls.filter(_.toString.matches("(^|.* )lazy.*$")).map(_.name.toString.trim)
val nonMethodsNames = classSymbol.info.decls.filter(!_.isMethod).map(_.name.toString.trim)
val nonLaziesNames = nonMethodsNames.toSet -- laziesNames.toSet
(laziesNames
.map(name => name -> instanceMirror.reflectMethod(classSymbol.toType.member(TermName(name)).asMethod)()) ++
nonLaziesNames
.map(name => name -> instanceMirror.reflectField(classSymbol.toType.member(TermName(name)).asTerm).get)).toMap
}
def toJSON: String = {
val fieldsStr =
fieldsMap.map(e => s""""${e._1.trim}": ${format(e._2)}""").mkString(",")
s"""{$fieldsStr}"""
}
def format(raw: Any, listSeparator: String = ","): String =
raw match {
case null => """"""""
case _: String => s""""${raw.toString}""""
case _: Char => s""""${raw.toString}""""
case _: Boolean => raw.toString
case _: Int => raw.toString
case _: Float => raw.toString
case _: Double => raw.toString
case _: List[Any] =>
s""""${
raw.asInstanceOf[List[Any]]
.map(format(_, listSeparator)).mkString(listSeparator).replace('"', ''')
}""""
case _: Map[Any, Any] =>
s""""${
raw.asInstanceOf[Map[Any, Any]].values
.map(format(_, listSeparator)).mkString(listSeparator).replace('"', ''')
}""""
case opt: Option[Any] =>
s""""${
opt match {
case Some(value) => value.toString
case None => ""
}
}""""
case obj: Product =>
val objClassMirror: Mirror = runtimeMirror(obj.getClass.getClassLoader)
val objClassSymbol: ClassSymbol = objClassMirror.classSymbol(obj.getClass)
val objInstanceMirror: InstanceMirror = objClassMirror.reflect(obj)
s""""${
objClassSymbol.info.decls
.filter(!_.isMethod) // Filter only attributes
.map(sym => format(objInstanceMirror.reflectField(sym.asTerm).get, listSeparator))
.mkString(listSeparator)
}""""
case date: Date => s""""${new SimpleDateFormat("yyyy-MM-dd").format(date)}""""
case _ => s""""${raw.toString}""""
}
}
Then I have my classes extend this trait, and I convert them to JSON by calling the method toJSON(), and then read them with the method provided by SparkSession:
case class Test (
field1: String,
field2: Double,
...
fieldN: Double
) extends Formattable {
lazy val lazyField1: Double = field2 * fieldN
lazy val lazyField2: Double = fieldN / field2
lazy val lazyField3: Double = lazyField1 * lazyField2
...
}
The code is somewhat more complex, so I was forced to use lazy values. The problem comes when I try to convert an RDD of objects of this type to DataFrame, since the action is done in a single node, thus losing the power of parallelization of Spark:
val spark: SparkSession = ...
val myRDD: RDD[Test] = ...
import spark.implicits._
val df: DataFrame = spark.read.json(myRDD.map(_.toJSON).toDS())
To put you in context, I am using the following versions:
Spark 2.3.0, Scala 2.11.0, Master: yarn-client
Do you have any idea why it can be? Thank you very much in advance.
Aucun commentaire:
Enregistrer un commentaire