Ich bin neu bei Spark.
Ich habe CSV-Daten in einen Spark-DataFrame geladen.
Ich muss dieses Datenframe in zwei unterschiedliche Datenframes aufteilen, wobei jedes eine Reihe von Spalten des Originals enthält.
Meine Frage ist also, wie man einen Spark-Datenrahmen basierend auf Spalten subset?
Wenn Sie Ihren Datenrahmen in zwei verschiedene Bereiche unterteilen möchten, führen Sie zwei Auswahlmöglichkeiten mit den gewünschten Spalten aus.
val sourceDf = spark.read.csv(...)
val df1 = sourceDF.select(first set of columns)
val df2 = sourceDF.select(second set of columns)
Beachten Sie, dass dies natürlich bedeutet, dass die sourceDf zweimal ausgewertet wird. Wenn sie also in den verteilten Speicher passen kann und Sie die meisten Spalten in beiden Datenrahmen verwenden, empfiehlt es sich möglicherweise, sie im Cache zu speichern. Es gibt viele zusätzliche Spalten, die Sie nicht benötigen. Dann können Sie zunächst eine Auswahl treffen, um die Spalten auszuwählen, die Sie benötigen, damit alle zusätzlichen Daten im Speicher abgelegt werden.
Nehmen wir an, unser übergeordnetes Dataframe hat 'n' Spalten
wir können 'x' child-DataFrames erstellen (betrachten wir in unserem Fall 2).
Die Spalten für das untergeordnete Dataframe können nach Belieben aus einer der übergeordneten Dataframe-Spalten ausgewählt werden.
Angenommen, source hat 10 Spalten und wir möchten in 2 DataFrames aufteilen, das Spalten enthält, auf die vom übergeordneten Dataframe verwiesen wird.
Die Spalten für das untergeordnete Dataframe können mit der select Dataframe-API festgelegt werden
val parentDF = spark.read.format("csv").load("/path of the CSV file")
val Child1_DF = parentDF.select("col1","col2","col3","col9","col10").show()
val child2_DF = parentDF.select("col5", "col6","col7","col8","col1","col2").show()
Beachten Sie, dass die Spaltenanzahl in den untergeordneten Datenrahmen sich in der Länge unterscheiden kann und unter der Anzahl der übergeordneten Datenrahmenspalten liegt.
wir können auch auf die Spaltennamen verweisen, ohne die realen Namen zu erwähnen, indem Sie die Positionsindizes der gewünschten Spalte aus dem übergeordneten Datenrahmen verwenden
Import spark implicits first, der als Hilfsklasse für die Verwendung von $ -notation für den Zugriff auf die Spalten über die Positionsindizes fungiert
import spark.implicits._
import org.Apache.spark.sql.functions._
val child3_DF = parentDF.select("_c0","_c1","_c2","_c8","_c9").show()
wir können auch Spalten auswählen, die auf bestimmten Bedingungen basieren. Nehmen wir an, wir möchten, dass im untergeordneten Datenrahmen nur geradzahlige Spalten ausgewählt werden. Wir beziehen uns sogar auf sogar indizierte Spalten und Index, die von '0' ausgehen.
val parentColumns = parentDF.columns.toList
res0: List[String] = List(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7,_c8,_c9)
val evenParentColumns = res0.zipWithIndex.filter(_._2 % 2 == 0).map( _._1).toSeq
res1: scala.collection.immutable.Seq[String] = List(_c0, _c2, _c4, _c6,_c8)
Nun füttern Sie diese Spalten, um sie aus parentDF auszuwählen. Beachten Sie, dass die Auswahl-API Argumente vom Typ "seq" benötigt. So konvertierten wir die "evenParentColumns" in die Seq-Sammlung
val child4_DF = parentDF.select(res1.head, res1.tail:_*).show()
Dadurch werden die gerade indizierten Spalten des übergeordneten Datenframes angezeigt.
| _c0 | _c2 | _c4 | _c6 | _c8 |
| ITE00100554 | TMAX | null | E | 1 |
| TE00100554 | TMIN | null | E | 4 |
| GM000010962 | PRCP | null | E | 7 |
Jetzt verbleiben die geraden Spalten im Datenrahmen
In ähnlicher Weise können wir auch andere Operationen auf die Dataframe-Spalte anwenden (siehe unten)
val child5_DF = parentDF.select($"_c0", $"_c8" + 1).show()
Wie bereits erwähnt, können wir also die Spalten im Dataframe auswählen.
Mit dem folgenden Code können Sie Spalten anhand ihres Index (ihrer Position) auswählen. Sie können die Zahlen für die Variable colNos ändern, um nur diese Spalten auszuwählen
import org.Apache.spark.sql.functions.col
val colNos = Seq(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35)
val Df_01 = Df.select(colNos_01 map Df.columns map col: _*)
Df_01.show(20, false)
Gelöst, Benutze einfach die Methode select für den Datenrahmen, um Spalten auszuwählen:
val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\cabs_trajectories\\cabs_trajectories\\green\\2014\\green_tripdata_2014-09.csv")
val df1=df.select("_c0")
dies würde die erste Spalte des Datenrahmens unterteilen
Mit select select
können Sie bestimmte Spalten auswählen, ihnen lesbare Namen geben und sie umsetzen. Zum Beispiel so:
spark.read.csv(path).select(
'_c0.alias("stn").cast(StringType),
'_c1.alias("wban").cast(StringType),
'_c2.alias("lat").cast(DoubleType),
'_c3.alias("lon").cast(DoubleType)
)
.where('_c2.isNotNull && '_c3.isNotNull && '_c2 =!= 0.0 && '_c3 =!= 0.0)