wake-up-neo.com

Hinzufügen einer Spalte von Rowsumens über eine Liste von Spalten in Spark Dataframe

Ich habe ein Spark-Datenfeld mit mehreren Spalten. Ich möchte dem Dataframe eine Spalte hinzufügen, die die Summe einer bestimmten Anzahl von Spalten ist. 

Zum Beispiel sehen meine Daten so aus:

ID var1 var2 var3 var4 var5
a   5     7    9    12   13
b   6     4    3    20   17
c   4     9    4    6    9
d   1     2    6    8    1

Ich möchte, dass eine Spalte hinzugefügt wird, in der die Zeilen für bestimmte Spalten summiert werden:

ID var1 var2 var3 var4 var5   sums
a   5     7    9    12   13    46
b   6     4    3    20   17    50
c   4     9    4    6    9     32
d   1     2    6    8    10    27

Ich weiß, dass es möglich ist, Spalten zusammen hinzuzufügen, wenn Sie die spezifischen hinzuzufügenden Spalten kennen: 

val newdf = df.withColumn("sumofcolumns", df("var1") + df("var2"))

Kann man eine Liste mit Spaltennamen übergeben und diese zusammen hinzufügen? Basierend auf dieser Antwort, die im Grunde das ist, was ich will, aber es verwendet die Python-API anstelle von Scala ( Spaltensumme als neue Spalte in PySpark-Dataframe hinzufügen ). 

//Select columns to sum
val columnstosum = ("var1", "var2","var3","var4","var5")

// Create new column called sumofcolumns which is sum of all columns listed in columnstosum
val newdf = df.withColumn("sumofcolumns", df.select(columstosum.head, columnstosum.tail: _*).sum)

Dies bewirkt, dass die Fehlerwertsumme kein Mitglied von org.Apache.spark.sql.DataFrame ist. Gibt es eine Möglichkeit, Spalten zu summieren?

Vielen Dank im Voraus für Ihre Hilfe. 

16
Sarah

Sie sollten folgendes versuchen:

import org.Apache.spark.sql.functions._

val sc: SparkContext = ...
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val input = sc.parallelize(Seq(
  ("a", 5, 7, 9, 12, 13),
  ("b", 6, 4, 3, 20, 17),
  ("c", 4, 9, 4, 6 , 9),
  ("d", 1, 2, 6, 8 , 1)
)).toDF("ID", "var1", "var2", "var3", "var4", "var5")

val columnsToSum = List(col("var1"), col("var2"), col("var3"), col("var4"), col("var5"))

val output = input.withColumn("sums", columnsToSum.reduce(_ + _))

output.show()

Dann ist das Ergebnis:

+---+----+----+----+----+----+----+
| ID|var1|var2|var3|var4|var5|sums|
+---+----+----+----+----+----+----+
|  a|   5|   7|   9|  12|  13|  46|
|  b|   6|   4|   3|  20|  17|  50|
|  c|   4|   9|   4|   6|   9|  32|
|  d|   1|   2|   6|   8|   1|  18|
+---+----+----+----+----+----+----+
29

Schlicht und einfach:

import org.Apache.spark.sql.Column
import org.Apache.spark.sql.functions.{lit, col}

def sum_(cols: Column*) = cols.foldLeft(lit(0))(_ + _)

val columnstosum = Seq("var1", "var2", "var3", "var4", "var5").map(col _)
df.select(sum_(columnstosum: _*))

mit Python-Äquivalent:

from functools import reduce
from operator import add
from pyspark.sql.functions import lit, col

def sum_(*cols):
    return reduce(add, cols, lit(0))

columnstosum = [col(x) for x in ["var1", "var2", "var3", "var4", "var5"]]
select("*", sum_(*columnstosum))

Beide werden standardmäßig auf NA gesetzt, wenn die Zeile einen fehlenden Wert enthält. Sie können die DataFrameNaFunctions.fill- oder coalesce-Funktion verwenden, um dies zu vermeiden.

8
zero323

Ich gehe davon aus, dass Sie einen Datenrahmen haben. Dann können Sie alle Spalten mit Ausnahme Ihrer ID-Spalte zusammenfassen. Dies ist hilfreich, wenn Sie viele Spalten haben und nicht manuell die Namen aller Spalten wie alle oben genannten angeben möchten. Dieser Beitrag hat die gleiche Antwort. 

val sumAll = df.columns.collect{ case x if x != "ID" => col(x) }.reduce(_ + _)
df.withColumn("sum", sumAll)
2
Abu Shoeb

Hier ist eine elegante Lösung mit Python:

NewDF = OldDF.withColumn('sums', sum(OldDF[col] for col in OldDF.columns[1:]))

Hoffentlich wird dies etwas Ähnliches in Spark beeinflussen ... irgendjemand ?.

0
Aerianis