dimanche 8 novembre 2020

How to resolve NoSuchMethodException in java when calling custom spark UDF

I've a spark streaming app where I need to call custom transformations on streaming data. These custom transformations are defined inside a class as methods and given to us as a jar file. We need to wrap these methods in UDFs and call those in our spark code. A sample set of transformations can be defined as follows. Please remember it comes as a jar.

import java.io.Serializable;

public class CustomTransformations implements Serializable {
    public String f1(String input) {
        return input + "_1";
    }

    public String f2(String input) {
        return input + "_2";
    }

    public String f3(String input) {
        return input + "_3";
    }
}

Let's assume somewhere (e.g. json or config file) we have a map of the transformations and the corresponding method names (Strings), so that given the transformation, we can wrap the corresponding method in a UDF and invoke it. I created the following class for this purpose.

import java.lang.reflect.Method;

import static org.apache.spark.sql.functions.udf;

public class Creator {
    public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
            throws NoSuchMethodException {
        Method method = ct.getClass().getDeclaredMethod(funcName);
        return udf(
                (UDF1<String, Object>) method::invoke, DataTypes.StringType);
    }
}

So far no compilation error. But now the issue is, if I call this method getUDF from the spark code, it shows a NoSuchMethodException. E.g. my spark code is something like following.

public class SampleSparkJob {
    public static void main(String[] args) {
        SparkSession.Builder sparkSessionBuilder = SparkSession.builder()
                .master("local[2]")
                .appName("sample-streaming");

        CustomTransformations ct = new CustomTransformations();
        try (SparkSession spark = sparkSessionBuilder.getOrCreate()) {
            Dataset<Row> df1 = MyKafkaConnectors.readFromKafka();

            // this is where I get the exceptions
            Dataset<Row> df2 = df1
                    .withColumn("value", Creator.getUDF(ct, "f1").apply(col("value")))
                    .withColumn("value", Creator.getUDF(ct, "f2").apply(col("value")))
                    .withColumn("value", Creator.getUDF(ct, "f3").apply(col("value")));

            StreamingQuery query = MyKafkaConnectors.WriteToKafka(df2);
            query.awaitTermination();
        } catch (TimeoutException | StreamingQueryException | NoSuchMethodException e) {
            e.printStackTrace();
        }
    }
}

This is the error that I get:

java.lang.NoSuchMethodException: <pkgname>.CustomTransformations.f1()
    at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
    at Creator.getUDF(Creator.java:14)
    at SampleSparkJob.main(SampleSparkJob.java:29)

Clearly, the client's CustomTransformations class has a method f1. So I'm not able to understand why it's showing this error. Any help is appreciated.





Aucun commentaire:

Enregistrer un commentaire