wake-up-neo.com

Filtern von Zeilen basierend auf Spaltenwerten in Spark-Datenrahmenskala

Ich habe ein Datenfeld (Funke):

id  value 
3     0
3     1
3     0
4     1
4     0
4     0

Ich möchte einen neuen Datenrahmen erstellen:

3 0
3 1
4 1

Sie müssen alle Zeilen nach 1(value) für jede ID entfernen. Ich habe es mit Fensterfunktionen in Spark Dateframe (Scala) versucht. Aber ich konnte keine Lösung finden. Ich gehe in eine falsche Richtung.

Ich suche nach einer Lösung in Scala. Danke

Ausgabe mit monotonically_increasing_id

 scala> val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data: org.Apache.spark.sql.DataFrame = [id: int, value: int]

scala> val minIdx = dataWithIndex.filter($"value" === 1).groupBy($"id").agg(min($"idx")).toDF("r_id", "min_idx")
minIdx: org.Apache.spark.sql.DataFrame = [r_id: int, min_idx: bigint]

scala> dataWithIndex.join(minIdx,($"r_id" === $"id") && ($"idx" <= $"min_idx")).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  4|    1|
+---+-----+

Die Lösung funktioniert nicht, wenn wir im ursprünglichen Datenrahmen eine sortierte Transformation vorgenommen haben. Zu dieser Zeit wird die monotonically_increasing_id () basierend auf dem Original DF generiert, anstatt dass das sortierte DF.I diese Anforderung zuvor verfehlt hat.

Alle Vorschläge sind willkommen. 

9
John

Eine Möglichkeit ist, monotonically_increasing_id() und einen Self-Join zu verwenden:

val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data.show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  3|    0|
|  4|    1|
|  4|    0|
|  4|    0|
+---+-----+

Jetzt generieren wir eine Spalte mit dem Namen idx mit zunehmendem Long:

val dataWithIndex = data.withColumn("idx", monotonically_increasing_id())
// dataWithIndex.cache()

Jetzt erhalten wir die min(idx) für jeden id wo value = 1:

val minIdx = dataWithIndex
               .filter($"value" === 1)
               .groupBy($"id")
               .agg(min($"idx"))
               .toDF("r_id", "min_idx")

Nun fügen wir die min(idx) zurück zum ursprünglichen DataFrame:

dataWithIndex.join(
  minIdx,
  ($"r_id" === $"id") && ($"idx" <= $"min_idx")
).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  4|    1|
+---+-----+

Hinweis: monotonically_increasing_id() generiert seinen Wert basierend auf der Partition der Zeile. Dieser Wert kann sich jedes Mal ändern, wenn dataWithIndex erneut ausgewertet wird. In meinem obigen Code wird wegen der schlechten Bewertung nur, wenn ich das letzte show-Objekt anrufe, monotonically_increasing_id() ausgewertet.

Wenn Sie möchten, dass der Wert gleich bleibt, z. B. um show zu verwenden, um den obigen Schritt für Schritt auszuwerten, entfernen Sie die folgende Zeile oben:

//  dataWithIndex.cache()
8
David Griffin

Hallo, ich habe die Lösung mit Window und Self Join gefunden.

val data = Seq((3,0,2),(3,1,3),(3,0,1),(4,1,6),(4,0,5),(4,0,4),(1,0,7),(1,1,8),(1,0,9),(2,1,10),(2,0,11),(2,0,12)).toDF("id", "value","sorted")

data.show

scala> data.show
+---+-----+------+
| id|value|sorted|
+---+-----+------+
|  3|    0|     2|
|  3|    1|     3|
|  3|    0|     1|
|  4|    1|     6|
|  4|    0|     5|
|  4|    0|     4|
|  1|    0|     7|
|  1|    1|     8|
|  1|    0|     9|
|  2|    1|    10|
|  2|    0|    11|
|  2|    0|    12|
+---+-----+------+




val sort_df=data.sort($"sorted")

scala> sort_df.show
+---+-----+------+
| id|value|sorted|
+---+-----+------+
|  3|    0|     1|
|  3|    0|     2|
|  3|    1|     3|
|  4|    0|     4|
|  4|    0|     5|
|  4|    1|     6|
|  1|    0|     7|
|  1|    1|     8|
|  1|    0|     9|
|  2|    1|    10|
|  2|    0|    11|
|  2|    0|    12|
+---+-----+------+



var window=Window.partitionBy("id").orderBy("$sorted")

 val sort_idx=sort_df.select($"*",rowNumber.over(window).as("count_index"))

val minIdx=sort_idx.filter($"value"===1).groupBy("id").agg(min("count_index")).toDF("idx","min_idx")

val result_id=sort_idx.join(minIdx,($"id"===$"idx") &&($"count_index" <= $"min_idx"))

result_id.show

+---+-----+------+-----------+---+-------+
| id|value|sorted|count_index|idx|min_idx|
+---+-----+------+-----------+---+-------+
|  1|    0|     7|          1|  1|      2|
|  1|    1|     8|          2|  1|      2|
|  2|    1|    10|          1|  2|      1|
|  3|    0|     1|          1|  3|      3|
|  3|    0|     2|          2|  3|      3|
|  3|    1|     3|          3|  3|      3|
|  4|    0|     4|          1|  4|      3|
|  4|    0|     5|          2|  4|      3|
|  4|    1|     6|          3|  4|      3|
+---+-----+------+-----------+---+-------+

Auf der Suche nach optimierten Lösungen. Danke

1
John
use isin method and filter as below:

val data = Seq((3,0,2),(3,1,3),(3,0,1),(4,1,6),(4,0,5),(4,0,4),(1,0,7),(1,1,8),(1,0,9),(2,1,10),(2,0,11),(2,0,12)).toDF("id", "value","sorted")
val idFilter = List(1, 2)
 data.filter($"id".isin(idFilter:_*)).show
+---+-----+------+
| id|value|sorted|
+---+-----+------+
|  1|    0|     7|
|  1|    1|     8|
|  1|    0|     9|
|  2|    1|    10|
|  2|    0|    11|
|  2|    0|    12|
+---+-----+------+

Ex: filter based on val
val valFilter = List(0)
data.filter($"value".isin(valFilter:_*)).show
+---+-----+------+
| id|value|sorted|
+---+-----+------+
|  3|    0|     2|
|  3|    0|     1|
|  4|    0|     5|
|  4|    0|     4|
|  1|    0|     7|
|  1|    0|     9|
|  2|    0|    11|
|  2|    0|    12|
+---+-----+------+
0
mputha

Sie können groupBy einfach so verwenden

val df2 = df1.groupBy("id","value").count().select("id","value")

Hier ist Ihr df1

id  value 
3     0
3     1
3     0
4     1
4     0
4     0

Das resultierende Datenframe ist df2. Dies ist Ihre erwartete Ausgabe wie diese

id  value 
3     0
3     1
4     1
4     0
0
Abu Shoeb