What is Needed:
number of tables in source database are changing rapidly and thus I don't want to edit case classes so I dynamically generate them through SCALA code and put in package. But now not able to read it dynamically. If this works than I would parse "com.example.datasources.fileSystemSource.schema.{}" as object schema members in loop
What has already been Done:
I have some case classes dynamically generated from schema of database tables as below:
object schema{
case class Users(name: String,
favorite_color: String,
favorite_numbers: Array[Int])
case class UserData(registration_dttm: Timestamp,
id: Int,
first_name: String,
last_name: String,
email: String,
gender: String,
ip_address: String,
cc: String,
country: String,
birthdate: String,
salary: Double,
title: String,
comments: String)
}
Then i have used them as dynamic type to read in Load[T] function in my Loader.scala as below:
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
class Load[T <: Product: Encoder](val tableName: String,
val inputPath: String,
val spark: SparkSession,
val saveMode: String,
val outputPath: String,
val metadata: Boolean)
extends Loader[T] {
val fileSystemSourceInstance: FileSystem[T] =
new FileSystem[T](inputPath, spark, saveMode, tableName)
override def Load: Dataset[T] =
fileSystemSourceInstance.provideData(metadata, outputPath).as[T]
}
Now, by using reflect.api I am able to get TypeTag for my case classes.
def stringToTypeTag[A](name: String): TypeTag[A] = {
val c = Class.forName(name)
val mirror = runtimeMirror(c.getClassLoader)
val sym = mirror.staticClass(name)
val tpe = sym.selfType
TypeTag(mirror, new api.TypeCreator {
def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
if (m eq mirror) tpe.asInstanceOf[U # Type]
else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
})
}
So if i print now my case class type tag I got:
val typetagDynamic = stringToTypeTag("com.example.datasources.fileSystemSource.schema.Users")
println(typetags)
TypeTag[com.example.datasources.fileSystemSource.schema.Users]
Problem:
Need to read these TypeTag or Dynamically generated case classes, to encode my datasets as below:
new Load[typetagDynamic](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic]).Load
This is giving me error : Cannot resolve symbol typetagDynamic
if used like this:
new Load[typetagDynamic.type](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic.type]).Load
This is giving me error : type arguments [T] do not conform to method product's type parameter bounds [T <: Product]
Aucun commentaire:
Enregistrer un commentaire