I am trying to save some Spark RDDs to HDFS using an Avro format. At the moment, I have a trait that defines two methods:
import org.apache.avro.specific.SpecificRecordBase
trait MyTrait {
type T <: SpecificRecordBase
def getData(): RDD[(AvroKey[T], NullWritable)] // overridden in other objects
def write(data: RDD[(AvroKey[T], NullWritable)]) = {
// ...
val job = Job.getInstance(...)
job.setOutputFormatClass(classOf[AvroKeyOutputFormat[T]])
AvroJob.setOutputKeySchema(job, T.SCHEMA$) // how to do this?
data saveAsNewAPIHadoopFile (
outputDirectory,
classOf[AvroKey[T]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[T]],
job.getConfiguration
)
}
}
Each object extending the trait MyTrait will override the type T
(according to the schema it will be use) and the method getData
.
I am stuck on the line
AvroJob.setOutputKeySchema(job, T.SCHEMA$) // how to do this?
It doesn't compile. I have read the Scala Reflection documentation page, but it's not 100% clear to me how I can use it, in order to access a static method (I could use the method .getSchema as well, but it just doesn't work).
Aucun commentaire:
Enregistrer un commentaire