jeudi 11 février 2016

Scala reflection with Serialization (over Spark)

To begin with I'm using scala 2.10.4 and the example above is run in Spark 1.6 (though I doubt Spark has anything to do with this, it's just a serialization issue).

So here's my problem: assume I have a trait Base that is implemented by say two classes B1 and B2. Now I have a generic trait that is extended by a collection of classes, one of them being over subtypes of Base e.g. (here I keep Spark's notion of RDD, but it could be something else actually; Something is just a result no matter what actually):

trait Foo[T] { def function(rdd: RDD[T]): Something }
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something  = ... }
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something  = ... }
...

Now I need an object that will take an RDD[T] (assume no ambuiguity here, it's just a simplified version) an that returns Something corresponding to the result of function corresponding with type T. But it should also work for Array[T] with a merging strategy. So far it looks like:

object Obj {
   def compute[T: TypeTag](input: RDD[T]): Something = {
      typeOf[T] match {
         case t if t <:< typeOf[A] => 
            val foo = new Foo[T]
            foo.function(rdd)
         case t if t <:< typeOf[Array[A]] => 
            val foo = new Foo[A]
            foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]])))
         case t if t <:< typeOf[Base] => 
            val foo = new Foo[T]
            foo.function(rdd)
         // here it gets ugly...
         case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why?
            val tt = getSubInfo[T](0)
            val tpe = tt.tpe
            val foo = new Foo[tpe.type]
            foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]]))
      }
   }

   private def mergeArray[T: TypeTag](a: Array[T]): T = ... // strategy to transform arrays of T into a T object when possible

  // extract the subtype, e.g. if Array[Int] then at position extracts Int, I can provide the code but not fondamental to comprehension of the problem though
   private def getSubInfo[T: TypeTag](i: Int) = ... 
}

Unfortunatly, it seems to work fine on a local machine, but when it gets sent to Spark (serialized), I get a org.apache.spark.SparkException: Task not serializable with:

Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
    - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types)
    - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)

I do have a workaround (quite obvious, enumerate possibilities), but for my curiosity, is there a way to fix this? And why aren't Symbol serializable whereas their equivalents in Manifests were?

Thanks for the help.





Aucun commentaire:

Enregistrer un commentaire