wake-up-neo.com

Spark Dataset API - Join

Ich versuche, die Spark Dataset API zu verwenden, aber ich habe einige Probleme beim Ausführen eines einfachen Joins.

Angenommen, ich habe zwei Datensätze mit Feldern: date | value, dann würde mein Join im Fall von DataFrame so aussehen:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

Für Dataset gibt es jedoch das .joinWith Methode, aber der gleiche Ansatz funktioniert nicht:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

Was ist das Argument für .joinWith?

20
mastro

Um joinWith zu verwenden, müssen Sie zuerst ein DataSet erstellen, höchstwahrscheinlich zwei davon. Um ein DataSet zu erstellen, müssen Sie eine Fallklasse erstellen, die Ihrem Schema entspricht, und DataFrame.as[T] Aufrufen, wobei T Ihre Fallklasse ist. So:

case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.Apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

Sie können auch die Fallklasse überspringen und ein Tupel verwenden:

val tupDs = df.as[(Int,String)]
// org.Apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

Wenn Sie dann eine andere Fallklasse/DF hatten, sagen Sie:

case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.Apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

Während die Syntax von join und joinWith ähnlich ist, sind die Ergebnisse unterschiedlich:

df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// |  1| asdf|  1| 7.7| 101|
// |  2|34234|  2| 1.2|  10|
// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// |       _1|         _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+

Wie Sie sehen, werden die Objekte durch joinWith als Teile eines Tupels intakt gelassen, während durch join die Spalten zu einem einzigen Namespace zusammengefasst werden. (Dies führt im obigen Fall zu Problemen, da der Spaltenname "key" wiederholt wird.)

Seltsamerweise muss ich df.col("key") und df2.col("key") verwenden, um die Bedingungen für den Beitritt zu ds und ds2 Zu erstellen - wenn Sie nur col("key") auf beiden Seiten funktioniert es nicht und ds.col(...) existiert nicht. Verwenden Sie jedoch das ursprüngliche df.col("key").

30
David Griffin

From https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html

es sieht so aus, als könntest du es einfach tun

dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )

Im obigen Beispiel können Sie die folgende Option versuchen -

  • Definieren Sie eine Fallklasse für Ihre Ausgabe

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • Verbinden Sie zwei Datensätze mit "Seq (" key ")", um zu vermeiden, dass die Ausgabe zwei doppelte Schlüsselspalten enthält. Welche helfen, die Fallklasse anzuwenden oder die Daten im nächsten Schritt abzurufen

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.Apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+

2
Syntax