I've a spark streaming app where I need to call custom transformations on streaming data. These custom transformations are defined inside a class as methods and given to us as a jar file. We need to wrap these methods in UDFs and call those in our spark code. A sample set of transformations can be defined as follows. Please remember it comes as a jar.
import java.io.Serializable;
public class CustomTransformations implements Serializable {
public String f1(String input) {
return input + "_1";
}
public String f2(String input) {
return input + "_2";
}
public String f3(String input) {
return input + "_3";
}
}
Let's assume somewhere (e.g. json or config file) we have a map of the transformations and the corresponding method names (Strings), so that given the transformation, we can wrap the corresponding method in a UDF and invoke it. I created the following class for this purpose.
import java.lang.reflect.Method;
import static org.apache.spark.sql.functions.udf;
public class Creator {
public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
throws NoSuchMethodException {
Method method = ct.getClass().getDeclaredMethod(funcName);
return udf(
(UDF1<String, Object>) method::invoke, DataTypes.StringType);
}
}
So far no compilation error. But now the issue is, if I call this method getUDF
from the spark code, it shows a NoSuchMethodException
. E.g. my spark code is something like following.
public class SampleSparkJob {
public static void main(String[] args) {
SparkSession.Builder sparkSessionBuilder = SparkSession.builder()
.master("local[2]")
.appName("sample-streaming");
CustomTransformations ct = new CustomTransformations();
try (SparkSession spark = sparkSessionBuilder.getOrCreate()) {
Dataset<Row> df1 = MyKafkaConnectors.readFromKafka();
// this is where I get the exceptions
Dataset<Row> df2 = df1
.withColumn("value", Creator.getUDF(ct, "f1").apply(col("value")))
.withColumn("value", Creator.getUDF(ct, "f2").apply(col("value")))
.withColumn("value", Creator.getUDF(ct, "f3").apply(col("value")));
StreamingQuery query = MyKafkaConnectors.WriteToKafka(df2);
query.awaitTermination();
} catch (TimeoutException | StreamingQueryException | NoSuchMethodException e) {
e.printStackTrace();
}
}
}
This is the error that I get:
java.lang.NoSuchMethodException: <pkgname>.CustomTransformations.f1()
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
at Creator.getUDF(Creator.java:14)
at SampleSparkJob.main(SampleSparkJob.java:29)
Clearly, the client's CustomTransformations
class has a method f1
. So I'm not able to understand why it's showing this error. Any help is appreciated.
Aucun commentaire:
Enregistrer un commentaire