wake-up-neo.com

Spark fügt dem Datenrahmen eine neue Spalte mit dem Wert aus der vorherigen Zeile hinzu

Ich frage mich, wie ich in Spark (Pyspark) Folgendes erreichen kann

Anfangsdatenrahmen:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+

Resultierender Datenrahmen:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+

Ich schaffe es, neue Spalten im Allgemeinen an ein Datenframe "anzuhängen", indem Sie Folgendes verwenden: df.withColumn("new_Col", df.num * 10)

Ich habe jedoch keine Ahnung, wie ich diese "Zeilenverschiebung" für die neue Spalte erreichen kann, sodass die neue Spalte den Wert eines Felds aus der vorherigen Zeile hat (wie im Beispiel gezeigt). Ich konnte in der API-Dokumentation auch nichts finden, wie auf eine bestimmte Zeile in einem DF nach Index zugegriffen werden kann.

Jede Hilfe wäre dankbar.

26
Kito

Sie können die lag-Fensterfunktion wie folgt verwenden 

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()

## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## |  2|3.0|    5.0|
## |  3|7.0|    3.0|
## |  4|9.0|    7.0|
## +---+---+-------+

aber es gibt einige wichtige fragen:

  1. wenn Sie eine globale Operation (nicht durch eine andere Spalte/Spalten partitioniert) benötigen, ist diese äußerst ineffizient.
  2. sie benötigen einen natürlichen Weg, um Ihre Daten zu bestellen. 

Während die zweite Ausgabe fast nie ein Problem ist, kann die erste ein Dealbreaker sein. In diesem Fall sollten Sie einfach Ihre DataFrame in RDD konvertieren und lag manuell berechnen. Siehe zum Beispiel:

Andere nützliche Links:

30
zero323
   val df = sc.parallelize(Seq((4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0))).toDF("id", "num")
df.show
+---+---+
| id|num|
+---+---+
|  4|9.0|
|  3|7.0|
|  2|3.0|
|  1|5.0|
+---+---+
df.withColumn("new_column", lag("num", 1, 0).over(w)).show
+---+---+----------+
| id|num|new_column|
+---+---+----------+
|  1|5.0|       0.0|
|  2|3.0|       5.0|
|  3|7.0|       3.0|
|  4|9.0|       7.0|
+---+---+----------+
0
mputha