Ich möchte einen Encoder für einen Row Typ in DataSet schreiben, für einen Kartenvorgang, den ich mache. Grundsätzlich verstehe ich nicht, wie man Encoder schreibt.
Unten sehen Sie ein Beispiel für eine Kartenoperation:
In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>
Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
@Override
public Iterator<String> call(Row row) throws Exception {
ArrayList<String> obj = //some map operation
return obj.iterator();
}
},Encoders.STRING());
Ich verstehe, dass anstelle eines Strings der Encoder wie folgt geschrieben werden muss:
Encoder<Row> encoder = new Encoder<Row>() {
@Override
public StructType schema() {
return join.schema();
//return null;
}
@Override
public ClassTag<Row> clsTag() {
return null;
}
};
Ich verstehe jedoch das clsTag () im Encoder nicht und versuche, ein laufendes Beispiel zu finden, das etwas Ähnliches demostrieren kann (d. H. Einen Encoder für einen Zeilentyp).
Bearbeiten - Dies ist keine Kopie der genannten Frage: Encoderfehler beim Zuordnen einer Datenrahmenzeile zu einer aktualisierten Zeile In der Antwort wird die Verwendung von Spark 1.x in Spark 2.x (mache ich nicht), außerdem suche ich nach einem Encoder für eine Row-Klasse, anstatt einen Fehler zu beheben. Schließlich suchte ich nach einer Lösung in Java, nicht in Scala.
Die Antwort ist, ein RowEncoder und das Schema des Datensatzes mit StructType zu verwenden.
Nachfolgend finden Sie ein Arbeitsbeispiel für eine Flatmap-Operation mit Datasets:
StructType structType = new StructType();
structType = structType.add("id1", DataTypes.LongType, false);
structType = structType.add("id2", DataTypes.LongType, false);
ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {
@Override
public Iterator<Row> call(Row row) throws Exception {
// a static map operation to demonstrate
List<Object> data = new ArrayList<>();
data.add(1l);
data.add(2l);
ArrayList<Row> list = new ArrayList<>();
list.add(RowFactory.create(data.toArray()));
return list.iterator();
}
}, encoder);
Ich hatte das gleiche Problem ... Encoders.kryo(Row.class))
arbeitete für mich.
Als Bonus beziehen sich die Apache Spark Tuning-Dokumente auf Kryo, da es bei der Serialisierung "oft bis zu 10x" schneller ist: