Ich frage mich, wie ich in Spark (Pyspark) Folgendes erreichen kann
Anfangsdatenrahmen:
+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Resultierender Datenrahmen:
+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0| 7.0 |
+--+---+-------+
|3 |7.0| 3.0 |
+--+---+-------+
|2 |3.0| 5.0 |
+--+---+-------+
Ich schaffe es, neue Spalten im Allgemeinen an ein Datenframe "anzuhängen", indem Sie Folgendes verwenden: df.withColumn("new_Col", df.num * 10)
Ich habe jedoch keine Ahnung, wie ich diese "Zeilenverschiebung" für die neue Spalte erreichen kann, sodass die neue Spalte den Wert eines Felds aus der vorherigen Zeile hat (wie im Beispiel gezeigt). Ich konnte in der API-Dokumentation auch nichts finden, wie auf eine bestimmte Zeile in einem DF nach Index zugegriffen werden kann.
Jede Hilfe wäre dankbar.
Sie können die lag
-Fensterfunktion wie folgt verwenden
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()
## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## | 2|3.0| 5.0|
## | 3|7.0| 3.0|
## | 4|9.0| 7.0|
## +---+---+-------+
aber es gibt einige wichtige fragen:
Während die zweite Ausgabe fast nie ein Problem ist, kann die erste ein Dealbreaker sein. In diesem Fall sollten Sie einfach Ihre DataFrame
in RDD konvertieren und lag
manuell berechnen. Siehe zum Beispiel:
Andere nützliche Links:
val df = sc.parallelize(Seq((4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0))).toDF("id", "num")
df.show
+---+---+
| id|num|
+---+---+
| 4|9.0|
| 3|7.0|
| 2|3.0|
| 1|5.0|
+---+---+
df.withColumn("new_column", lag("num", 1, 0).over(w)).show
+---+---+----------+
| id|num|new_column|
+---+---+----------+
| 1|5.0| 0.0|
| 2|3.0| 5.0|
| 3|7.0| 3.0|
| 4|9.0| 7.0|
+---+---+----------+