mercredi 8 août 2018

scala spark register string to udf using reflect

Now I have to register string to udf at runtime.I will get a post request format in json,which include

{
    "udfName":"foo",
    "udfBody":"(time: String) => {
      if (!time.toString.matches("\\d+")) 0
      else {
        time.toString.toDouble / 1000000
      }
     }",
    "inputTypes:[String]",
    "outputType":"Double"
}

Here is my approach. 1.First compile udfBody to function(using Toolbox), 2.then register the function to udf,but I don't know how to reflect Functionx,x represent Function1 to Function22 in scala.For example,if inputTypes contains 3 elements,then I will use Function3.Since Functionx is a trait,I don't know how to use reflect to get Functionx. 3.register udf.

import org.apache.spark.sql.UDFRegistration
import org.apache.spark.sql.expressions.UserDefinedFunction
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

val ss = SparkSessionHolder.getSparkSessionHolder().getOrCreate()
val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()//get a instance of toolbox
val udfBody = json.get('udfBody')//get udfBody from json
val function = tb.eval(tb.parse(udfBody)).asInstanceOf[Function1[String,String]]//here I don't know the value of x beforehand,so I have to use reflect.
ss.udf.register("foo",function)

So,how can I get thee Functionx using reflect?





Aucun commentaire:

Enregistrer un commentaire