I have a object where I have written my Udf
object UdfBody {
def udfUpperCase() = { (s: String, sq: String) => s.concat(sq) }
Now this is in a different module, I want this udf to be used in my Spark processing. So, I have am using reflection to use this udf and trying to register it using this
def registerUdfFunc() = {
val udfDf = DataFrameBuilder.getDataFrame("UDF_TABLE", "RDBMS", null, null, null)
udfDf.collect.foreach { x =>
registerDynamicUDF(x.getString(x.fieldIndex("className")), x.getString(x.fieldIndex("functionName")), x.getString(x.fieldIndex("udfName")));
}
private def registerDynamicUDF(objectName: String, udfFunction: String, udfName: String) =
{
val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
val moduleSymbol = runtimeMirror.moduleSymbol(Class.forName(objectName))
var targetMethod = moduleSymbol.typeSignature.members.filter(x =>
x.isMethod && x.name.toString == udfFunction).head.asMethod
var function = runtimeMirror.reflect(runtimeMirror.reflectModule(moduleSymbol).instance).reflectMethod(targetMethod)()
SparkSession.builder.getOrCreate().udf.register(udfName, function)
}
I have stored the Object name, UdfName(for registration), fucntionName in a table, from which I create dataframe and iterate through. Now when I Use
SparkSession.builder.getOrCreate().udf.register(udfName, function)
it gives me error, saying cannot register udf with type anyand if I try to cast it to UserDefinedFunction it gives me classcast exception. What should I do, any Ideas?
Aucun commentaire:
Enregistrer un commentaire