vendredi 10 août 2018

Actions on RDDs are executed on a single node when the members contain lazy values evaluated by reflection

I am trying to convert an RDD of objects to DataFrame, with the peculiarity that said objects have lazy values.

To do this without having to specify the schema of each object by hand, I have created a trait that converts that object to JSON format (or any other text format). The code is the following:

trait Formattable {

  lazy val fieldsMap: Map[String, Any] = {
    val mirror: Mirror = runtimeMirror(this.getClass.getClassLoader)
    val classSymbol: ClassSymbol = mirror.classSymbol(this.getClass)
    val instanceMirror: InstanceMirror = mirror.reflect(this)

    val laziesNames = classSymbol.info.decls.filter(_.toString.matches("(^|.* )lazy.*$")).map(_.name.toString.trim)
    val nonMethodsNames = classSymbol.info.decls.filter(!_.isMethod).map(_.name.toString.trim)
    val nonLaziesNames = nonMethodsNames.toSet -- laziesNames.toSet

    (laziesNames
      .map(name => name -> instanceMirror.reflectMethod(classSymbol.toType.member(TermName(name)).asMethod)()) ++
      nonLaziesNames
        .map(name => name -> instanceMirror.reflectField(classSymbol.toType.member(TermName(name)).asTerm).get)).toMap
  }

  def toJSON: String = {
    val fieldsStr =
      fieldsMap.map(e => s""""${e._1.trim}": ${format(e._2)}""").mkString(",")
    s"""{$fieldsStr}"""
  }

  def format(raw: Any, listSeparator: String = ","): String =
    raw match {
      case null => """"""""
      case _: String => s""""${raw.toString}""""
      case _: Char => s""""${raw.toString}""""
      case _: Boolean => raw.toString
      case _: Int => raw.toString
      case _: Float => raw.toString
      case _: Double => raw.toString
      case _: List[Any] =>
        s""""${
          raw.asInstanceOf[List[Any]]
            .map(format(_, listSeparator)).mkString(listSeparator).replace('"', ''')
        }""""
      case _: Map[Any, Any] =>
        s""""${
          raw.asInstanceOf[Map[Any, Any]].values
            .map(format(_, listSeparator)).mkString(listSeparator).replace('"', ''')
        }""""
      case opt: Option[Any] =>
        s""""${
          opt match {
            case Some(value) => value.toString
            case None => ""
          }
        }""""
      case obj: Product =>
        val objClassMirror: Mirror = runtimeMirror(obj.getClass.getClassLoader)
        val objClassSymbol: ClassSymbol = objClassMirror.classSymbol(obj.getClass)
        val objInstanceMirror: InstanceMirror = objClassMirror.reflect(obj)
        s""""${
          objClassSymbol.info.decls
            .filter(!_.isMethod) // Filter only attributes
            .map(sym => format(objInstanceMirror.reflectField(sym.asTerm).get, listSeparator))
            .mkString(listSeparator)
        }""""
      case date: Date => s""""${new SimpleDateFormat("yyyy-MM-dd").format(date)}""""
      case _ => s""""${raw.toString}""""
    }


}

Then I have my classes extend this trait, and I convert them to JSON by calling the method toJSON(), and then read them with the method provided by SparkSession:

case class Test (
                  field1: String,
                  field2: Double,
                  ...
                  fieldN: Double
                ) extends Formattable {
  lazy val lazyField1: Double = field2 * fieldN
  lazy val lazyField2: Double = fieldN / field2
  lazy val lazyField3: Double = lazyField1 * lazyField2
  ...
}

The code is somewhat more complex, so I was forced to use lazy values. The problem comes when I try to convert an RDD of objects of this type to DataFrame, since the action is done in a single node, thus losing the power of parallelization of Spark:

val spark: SparkSession = ...
val myRDD: RDD[Test] = ... 

import spark.implicits._
val df: DataFrame = spark.read.json(myRDD.map(_.toJSON).toDS())

To put you in context, I am using the following versions:

Spark 2.3.0, Scala 2.11.0, Master: yarn-client

Do you have any idea why it can be? Thank you very much in advance.





Aucun commentaire:

Enregistrer un commentaire