wake-up-neo.com

So erstellen Sie einen benutzerdefinierten Encoder in Spark 2.X-Datensätzen

Funken-Datensätze verschieben sich von Rows zu Encoders für Pojos/Primitiven. Die Catalyst-Engine verwendet eine ExpressionEncoder zum Konvertieren von Spalten in einem SQL-Ausdruck. Es scheint jedoch keine anderen Unterklassen von Encoder zu geben, die als Vorlage für unsere eigenen Implementierungen verwendet werden können. 

Hier ist ein Beispiel für Code, der in Spark 1.X/DataFrames glücklich ist und in dem neuen Regime nicht kompiliert wird:

//mapping each row to RDD Tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[[email protected]] => arr
      case _ => {
        log.error("Unsupport value type")
        null
      }
    }
    (id, label, channels, height, width, data)
  }).persist(StorageLevel.DISK_ONLY)

}

Wir erhalten einen Compiler-Fehler von 

Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported 
by importing spark.implicits._  Support for serializing other types will be added in future releases.
    df.map(row => {
          ^

Also dann/irgendwo sollte es ein Mittel geben 

  • Definieren/Implementieren Sie unseren benutzerdefinierten Encoder
  • Wenden Sie es an, wenn Sie ein Mapping auf die DataFrame durchführen (die jetzt ein Dataset vom Typ Row ist)
  • Registrieren Sie den Encoder für die Verwendung durch anderen benutzerdefinierten Code

Ich suche nach Code, der diese Schritte erfolgreich ausführt.

17
javadba

Soweit mir bekannt ist, hat sich seit 1.6 und den in Wie werden benutzerdefinierte Objekte in Dataset gespeichert? sind die einzigen verfügbaren Optionen. Trotzdem sollte Ihr aktueller Code mit Standard-Encodern für Produkttypen gut funktionieren.

Um zu verstehen, warum Ihr Code in 1.x funktioniert hat und in 2.0.0 möglicherweise nicht funktioniert, müssen Sie die Signaturen überprüfen. In 1.x ist DataFrame.map eine Methode, die die Funktion Row => T übernimmt und RDD[Row] in RDD[T] umwandelt.

In 2.0.0 übernimmt DataFrame.map auch eine Funktion des Typs Row => T, wandelt jedoch Dataset[Row] (a.k.a DataFrame) in Dataset[T] um, daher erfordert T eine Encoder. Wenn Sie das "alte" Verhalten erhalten möchten, sollten Sie RDD explizit verwenden:

df.rdd.map(row => ???)

Für Dataset[Row]map siehe Encoder-Fehler beim Zuordnen der Dataframe-Zeile zur aktualisierten Zeile

17
zero323

Haben Sie die impliziten Encoder importiert?

import spark.implicits._

http://spark.Apache.org/docs/2.0.0-preview/api/scala/index.html#org.Apache.spark.sql.Encoder

3
eyal edelman

Ich habe spark.implicits._ importiert. Wo Spark die SparkSession ist, wurde der Fehler behoben, und benutzerdefinierte Encoder wurden importiert.

Auch das Schreiben eines benutzerdefinierten Encoders ist ein Ausweg, den ich nicht ausprobiert habe.

Arbeitslösung: - Erstellen Sie SparkSession und importieren Sie Folgendes

import spark.implicits._

0
Valan Aravind