I want to dynamically generate some kafka stream code when the program is running, but I get an exception when I compile the following code for kafka stream with scala toolbox eval:
val toolbox = runtimeMirror(getClass.getClassLoader).mkToolBox()
val code =
"""
|import org.apache.kafka.streams.scala.kstream.KStream
|import org.apache.kafka.streams.scala.StreamsBuilder
|
|class ClassA {
| def createStream(): KStream[String, String] = {
| import org.apache.kafka.streams.scala.ImplicitConversions._
| import org.apache.kafka.streams.scala.Serdes._
| new StreamsBuilder().stream("TestTopic")
| }
|}
|scala.reflect.classTag[ClassA].runtimeClass
""".stripMargin
toolbox.eval(toolbox.parse(code))
errors:
reflective compilation has failed:
illegal cyclic reference involving class InterfaceStability
scala.tools.reflect.ToolBoxError: reflective compilation has failed:
illegal cyclic reference involving class InterfaceStability
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.throwIfErrors(ToolBoxFactory.scala:331)
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.wrapInPackageAndCompile(ToolBoxFactory.scala:213)
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:267)
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:444)
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:370)
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:437)
at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.eval(ToolBoxFactory.scala:459)
It seems that the kafka InterfaceStability class annotated itself:
package org.apache.kafka.common.annotation;
@InterfaceStability.Evolving
public class InterfaceStability {
...
}
Scala version:"2.12.8"
kafkaVersion: "2.1.0"
Sbt version : "1.2.7"
Can anyone help me, thank you very much?
Aucun commentaire:
Enregistrer un commentaire