vendredi 3 février 2023

Return Row with schema defined at runtime in Spark UDF

I've dulled my sword on this one, some help would be greatly appreciated!

Background

I am building an ETL pipeline that takes GNMI Protobuf update messages off of a Kafka queue and eventually breaks them out into a bunch of delta tables based on the prefix and parameters of the paths to values (e.g. DataBricks runtime).

Without going into the gory details, each prefix corresponds roughly to a schema for a table, with the caveat that the paths can change (usually new subtrees) upstream, so the schema is not fixed. This is similar to a nested JSON structure .

I first break out the updates by prefix, so all of the updates have roughly the same schema. I defined some transformations so that when the schema does not match exactly, I can coerce them into a common schema.

I'm running into trouble when I try to create a struct column with the common schema.

Attempt 1

I first tried just returning an Array[Any] from my udf, and providing a schema in the UDF definition (I know this is deprecated):

  def mapToRow(deserialized: Map[String, ParsedValueV2]): Array[Any] = {
    def getValue(key: String): Any = {
        deserialized.get(key) match {
            case Some(value) => value.asType(columns(key))
            case None => None
        }
    }
    
    columns.keys.toArray.map(getValue).toArray
  }
  
  spark.conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true")
  def mapToStructUdf = F.udf(mapToRow _, account.sparkSchemas(prefix))

This snippet creates an Array object with the typed values that I need. Unfortunately when I try to use the UDF, I get this error:

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line8760b7c10da04d2489451bb90ca42c6535.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ParsedValueV2

I'm not sure what's not matching, but I did notice that the type of the values are Java types, not scala, so perhaps that is related?

Attempt 2

Maybe I can use the Typed UDF interface after all? Can I create a case class at runtime for each schema, and then use that as the return value from my udf?

I've tried to get this to work using various stuff I found like this:

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
val test = tb.eval(tb.parse("object Test; Test"))

but I can't even get an instance of test, and can't figure out how to use it as the return value of a UDF. I presume I need to use a generic type somehow, but my scala-fu is too weak to figure this one out.

Finally, the question

Can some help me figure out which approach to take, and how to proceed with that approach?

Thanks in advance for your help!!!





Aucun commentaire:

Enregistrer un commentaire