Ich habe zwei Datenrahmen erstellt. Wie können wir mehrere Spark dataframes verbinden?
Beispielsweise :
PersonDf
, ProfileDf
mit einer gemeinsamen Spalte als personId
als (Schlüssel). Wie können wir nun einen Datenrahmen haben, der PersonDf
und ProfileDf
kombiniert?
Sie können die Case-Klasse verwenden, um einen Beispieldatensatz vorzubereiten. Dies ist beispielsweise optional. Sie können DataFrame
auch aus hiveContext.sql
Abrufen.
import org.Apache.spark.sql.functions.col
case class Person(name: String, age: Int, personid : Int)
case class Profile(name: String, personid : Int , profileDescription: String)
val df1 = sqlContext.createDataFrame(
Person("Bindu",20, 2)
:: Person("Raphel",25, 5)
:: Person("Ram",40, 9):: Nil)
val df2 = sqlContext.createDataFrame(
Profile("Spark",2, "SparkSQLMaster")
:: Profile("Spark",5, "SparkGuru")
:: Profile("Spark",9, "DevHunter"):: Nil
)
// you can do alias to refer column name with aliases to increase readablity
val df_asPerson = df1.as("dfperson")
val df_asProfile = df2.as("dfprofile")
val joined_df = df_asPerson.join(
df_asProfile
, col("dfperson.personid") === col("dfprofile.personid")
, "inner")
joined_df.select(
col("dfperson.name")
, col("dfperson.age")
, col("dfprofile.name")
, col("dfprofile.profileDescription"))
.show
beispiel Temp Tisch Ansatz, den ich persönlich nicht mag ...
df_asPerson.registerTempTable("dfperson");
df_asProfile.registerTempTable("dfprofile")
sqlContext.sql("""SELECT dfperson.name, dfperson.age, dfprofile.profileDescription
FROM dfperson JOIN dfprofile
ON dfperson.personid == dfprofile.personid""")
Anmerkung: 1) Wie von @ RaphaelRoth erwähnt,
val resultDf = PersonDf.join(ProfileDf,Seq("personId"))
ist ein guter Ansatz, da es keine doppelten Spalten von beiden Seiten gibt, wenn Sie die innere Verknüpfung mit derselben Tabelle verwenden.
2) Spark 2.x-Beispiel aktualisiert in einer anderen Antwort mit allen Join-Operationen, die von spark 2.x mit Beispielen + Ergebnis unterstützt werden
Auch wichtig in Joins: Broadcast-Funktion kann helfen, einen Hinweis zu geben, siehe meine Antwort
sie können verwenden
val resultDf = PersonDf.join(ProfileDf, PersonDf("personId") === ProfileDf("personId"))
oder kürzer und flexibler (da Sie problemlos mehr als 1 Spalten für die Verbindung angeben können)
val resultDf = PersonDf.join(ProfileDf,Seq("personId"))
Einweg
// join type can be inner, left, right, fullouter
val mergedDf = df1.join(df2, Seq("keyCol"), "inner")
// keyCol can be multiple column names seperated by comma
val mergedDf = df1.join(df2, Seq("keyCol1", "keyCol2"), "left")
Ein anderer Weg
import spark.implicits._
val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName"))
// to select specific columns as output
val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName")).select($"d1.*", $"d2.anotherColName")
Verwenden Sie in https://spark.Apache.org/docs/1.5.1/api/Java/org/Apache/spark/sql/DataFrame.htmljoin
:
Innerer Equi-Join mit einem anderen DataFrame unter Verwendung der angegebenen Spalte.
PersonDf.join(ProfileDf,$"personId")
OR
PersonDf.join(ProfileDf,PersonDf("personId") === ProfileDf("personId"))
Update:
Sie können die DFs
auch mit df.registerTempTable("tableName")
als temporäre Tabelle speichern und mit sqlContext
SQL-Abfragen schreiben.