import org.springframework.stereotype.Service;
import java.io.Serializable;
@Service
class Job implements Serializable
{
private Dataset<String> getPropIdValueEqualsDataset(final SparkSession sparkSession, final Dataset<Root> dataset, final String propId)
{
Map<String, Field> propFieldMap = Arrays.stream(FieldUtils.getAllFields(Root.class)).collect(Collectors.toMap(Field::getName, Function.identity()));
Dataset<String> strDataset = dataset.flatMap(new FlatMapFunction<Root, String>()
{
@Override
public Iterator<String> call(Root root) throws Exception
{
List<String> list = new ArrayList<>();
Field dataField = propFieldMap.get(propId);
dataField.setAccessible(true);
String dataFieldValue = (String)dataField.get(root);
list.add(dataFieldValue);
return list.iterator();
}
}, Encoders.bean(String.class));
return strDataset;
}
}
I am getting these two errors
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: java.lang.reflect.Field Serialization stack:
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.app.Job$$EnhancerBySpringCGLIB$$96d07abc
How do I fix this error?
Aucun commentaire:
Enregistrer un commentaire