wake-up-neo.com

Encoder für Zeilentyp Spark Datasets

Ich möchte einen Encoder für einen Row Typ in DataSet schreiben, für einen Kartenvorgang, den ich mache. Grundsätzlich verstehe ich nicht, wie man Encoder schreibt.

Unten sehen Sie ein Beispiel für eine Kartenoperation:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
            @Override
            public Iterator<String> call(Row row) throws Exception {

                ArrayList<String> obj = //some map operation
                return obj.iterator();
            }
        },Encoders.STRING());

Ich verstehe, dass anstelle eines Strings der Encoder wie folgt geschrieben werden muss:

    Encoder<Row> encoder = new Encoder<Row>() {
        @Override
        public StructType schema() {
            return join.schema();
            //return null;
        }

        @Override
        public ClassTag<Row> clsTag() {
            return null;
        }
    };

Ich verstehe jedoch das clsTag () im Encoder nicht und versuche, ein laufendes Beispiel zu finden, das etwas Ähnliches demostrieren kann (d. H. Einen Encoder für einen Zeilentyp).

Bearbeiten - Dies ist keine Kopie der genannten Frage: Encoderfehler beim Zuordnen einer Datenrahmenzeile zu einer aktualisierten Zeile In der Antwort wird die Verwendung von Spark 1.x in Spark 2.x (mache ich nicht), außerdem suche ich nach einem Encoder für eine Row-Klasse, anstatt einen Fehler zu beheben. Schließlich suchte ich nach einer Lösung in Java, nicht in Scala.

23
tsar2512

Die Antwort ist, ein RowEncoder und das Schema des Datensatzes mit StructType zu verwenden.

Nachfolgend finden Sie ein Arbeitsbeispiel für eine Flatmap-Operation mit Datasets:

    StructType structType = new StructType();
    structType = structType.add("id1", DataTypes.LongType, false);
    structType = structType.add("id2", DataTypes.LongType, false);

    ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);

    Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {
        @Override
        public Iterator<Row> call(Row row) throws Exception {
            // a static map operation to demonstrate
            List<Object> data = new ArrayList<>();
            data.add(1l);
            data.add(2l);
            ArrayList<Row> list = new ArrayList<>();
            list.add(RowFactory.create(data.toArray()));
            return list.iterator();
        }
    }, encoder);
24
tsar2512

Ich hatte das gleiche Problem ... Encoders.kryo(Row.class)) arbeitete für mich.

Als Bonus beziehen sich die Apache Spark Tuning-Dokumente auf Kryo, da es bei der Serialisierung "oft bis zu 10x" schneller ist:

https://spark.Apache.org/docs/latest/tuning.html

5
Jim Bob