wake-up-neo.com

Verknüpfen von Spark Datenrahmen auf dem Schlüssel

Ich habe zwei Datenrahmen erstellt. Wie können wir mehrere Spark dataframes verbinden?

Beispielsweise :

PersonDf, ProfileDf mit einer gemeinsamen Spalte als personId als (Schlüssel). Wie können wir nun einen Datenrahmen haben, der PersonDf und ProfileDf kombiniert?

36
Bindumalini KK

Alias-Ansatz mit scala ( Dies ist ein Beispiel für eine ältere Version von spark für spark 2.x siehe meine andere Antwort ):

Sie können die Case-Klasse verwenden, um einen Beispieldatensatz vorzubereiten. Dies ist beispielsweise optional. Sie können DataFrame auch aus hiveContext.sql Abrufen.

import org.Apache.spark.sql.functions.col

case class Person(name: String, age: Int, personid : Int)

case class Profile(name: String, personid  : Int , profileDescription: String)

    val df1 = sqlContext.createDataFrame(
   Person("Bindu",20,  2) 
:: Person("Raphel",25, 5) 
:: Person("Ram",40, 9):: Nil)


val df2 = sqlContext.createDataFrame(
Profile("Spark",2,  "SparkSQLMaster") 
:: Profile("Spark",5, "SparkGuru") 
:: Profile("Spark",9, "DevHunter"):: Nil
)

// you can do alias to refer column name with aliases to  increase readablity

val df_asPerson = df1.as("dfperson")
val df_asProfile = df2.as("dfprofile")


val joined_df = df_asPerson.join(
    df_asProfile
, col("dfperson.personid") === col("dfprofile.personid")
, "inner")


joined_df.select(
  col("dfperson.name")
, col("dfperson.age")
, col("dfprofile.name")
, col("dfprofile.profileDescription"))
.show

beispiel Temp Tisch Ansatz, den ich persönlich nicht mag ...

Der Grund für die Verwendung der registerTempTable( tableName ) -Methode für einen DataFrame ist, dass Sie nicht nur die von Spark bereitgestellten Methoden eines DataFrame verwenden können, sondern auch SQL-Abfragen über die sqlContext.sql( sqlQuery ) Methode, die diesen DataFrame als SQL-Tabelle verwendet. Der Parameter tableName gibt den Tabellennamen an, der für diesen DataFrame in den SQL-Abfragen verwendet werden soll.

df_asPerson.registerTempTable("dfperson");
df_asProfile.registerTempTable("dfprofile")

sqlContext.sql("""SELECT dfperson.name, dfperson.age, dfprofile.profileDescription
                  FROM  dfperson JOIN  dfprofile
                  ON dfperson.personid == dfprofile.personid""")

Wenn Sie mehr über Joins wissen möchten, lesen Sie bitte diesen netten Beitrag: beyond-traditional-join-with-Apache-spark

enter image description here

Anmerkung: 1) Wie von @ RaphaelRoth erwähnt,

val resultDf = PersonDf.join(ProfileDf,Seq("personId")) ist ein guter Ansatz, da es keine doppelten Spalten von beiden Seiten gibt, wenn Sie die innere Verknüpfung mit derselben Tabelle verwenden.
2) Spark 2.x-Beispiel aktualisiert in einer anderen Antwort mit allen Join-Operationen, die von spark 2.x mit Beispielen + Ergebnis unterstützt werden

TIPP:

Auch wichtig in Joins: Broadcast-Funktion kann helfen, einen Hinweis zu geben, siehe meine Antwort

45
Ram Ghadiyaram

sie können verwenden

val resultDf = PersonDf.join(ProfileDf, PersonDf("personId") === ProfileDf("personId"))

oder kürzer und flexibler (da Sie problemlos mehr als 1 Spalten für die Verbindung angeben können)

val resultDf = PersonDf.join(ProfileDf,Seq("personId"))
18
Raphael Roth

Einweg

// join type can be inner, left, right, fullouter
val mergedDf = df1.join(df2, Seq("keyCol"), "inner")
// keyCol can be multiple column names seperated by comma
val mergedDf = df1.join(df2, Seq("keyCol1", "keyCol2"), "left")

Ein anderer Weg

import spark.implicits._ 
val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName"))
// to select specific columns as output
val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName")).select($"d1.*", $"d2.anotherColName")
3
Abu Shoeb

Verwenden Sie in https://spark.Apache.org/docs/1.5.1/api/Java/org/Apache/spark/sql/DataFrame.htmljoin:

Innerer Equi-Join mit einem anderen DataFrame unter Verwendung der angegebenen Spalte.

PersonDf.join(ProfileDf,$"personId")

OR

PersonDf.join(ProfileDf,PersonDf("personId") === ProfileDf("personId"))

Update:

Sie können die DFs auch mit df.registerTempTable("tableName") als temporäre Tabelle speichern und mit sqlContext SQL-Abfragen schreiben.

1
Shankar