mardi 27 février 2018

Spark - Registering Udf's through reflection

I have a object where I have written my Udf

object UdfBody {
def udfUpperCase() = { (s: String, sq: String) => s.concat(sq) }

Now this is in a different module, I want this udf to be used in my Spark processing. So, I have am using reflection to use this udf and trying to register it using this

def registerUdfFunc() = {
     val udfDf = DataFrameBuilder.getDataFrame("UDF_TABLE", "RDBMS", null, null, null)
     udfDf.collect.foreach { x =>
     registerDynamicUDF(x.getString(x.fieldIndex("className")), x.getString(x.fieldIndex("functionName")), x.getString(x.fieldIndex("udfName")));
}


private def registerDynamicUDF(objectName: String, udfFunction: String, udfName: String) = 
{
 val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
 val moduleSymbol = runtimeMirror.moduleSymbol(Class.forName(objectName))  
 var targetMethod = moduleSymbol.typeSignature.members.filter(x => 
 x.isMethod && x.name.toString == udfFunction).head.asMethod
 var function = runtimeMirror.reflect(runtimeMirror.reflectModule(moduleSymbol).instance).reflectMethod(targetMethod)()
 SparkSession.builder.getOrCreate().udf.register(udfName, function)
 } 

I have stored the Object name, UdfName(for registration), fucntionName in a table, from which I create dataframe and iterate through. Now when I Use

SparkSession.builder.getOrCreate().udf.register(udfName, function) 

it gives me error, saying cannot register udf with type anyand if I try to cast it to UserDefinedFunction it gives me classcast exception. What should I do, any Ideas?





Aucun commentaire:

Enregistrer un commentaire