wake-up-neo.com

Aktualisieren einer Datenrahmenspalte in spark

Betrachtet man die neue spark dataframe api, ist unklar, ob es möglich ist, Dataframe-Spalten zu ändern.

Wie gehe ich vor, um einen Wert in Zeile x Spalte y eines Datenrahmens zu ändern?

In pandas wäre dies df.ix[x,y] = new_value

Bearbeiten: Konsolidieren Sie das, was weiter unten gesagt wurde. Sie können den vorhandenen Datenrahmen nicht ändern, da er unveränderlich ist. Sie können jedoch einen neuen Datenrahmen mit den gewünschten Änderungen zurückgeben.

Wenn Sie nur einen Wert in einer Spalte basierend auf einer Bedingung ersetzen möchten, z. B. np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

Wenn Sie eine Operation für eine Spalte ausführen und eine neue Spalte erstellen möchten, die dem Datenrahmen hinzugefügt wird:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

Wenn Sie möchten, dass die neue Spalte denselben Namen wie die alte Spalte hat, können Sie den folgenden zusätzlichen Schritt hinzufügen:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
61
Luke

Während Sie eine Spalte nicht als solche ändern können, können Sie eine Spalte bearbeiten und einen neuen DataFrame zurückgeben, der diese Änderung widerspiegelt. Dazu erstellen Sie zunächst ein UserDefinedFunction, das die anzuwendende Operation implementiert, und wenden diese Funktion dann selektiv nur auf die Zielspalte an. In Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df hat jetzt das gleiche Schema wie old_df (vorausgesetzt, dass old_df.target_column war ebenfalls vom Typ StringType, aber alle Werte in Spalte target_column wird sein new_value.

62
karlson

In der Regel möchten wir beim Aktualisieren einer Spalte einen alten Wert einem neuen Wert zuordnen. Hier ist eine Möglichkeit, dies in Pyspark ohne UDFs zu tun:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).
38
Paul

DataFrames basieren auf RDDs. RDDs sind unveränderliche Strukturen und ermöglichen keine Aktualisierung von Elementen vor Ort. Um Werte zu ändern, müssen Sie einen neuen DataFrame erstellen, indem Sie den ursprünglichen mit SQL-ähnlichen DSL- oder RDD-Operationen wie map transformieren.

Ein sehr empfehlenswertes Dia-Deck: Einführung in DataFrames in Spark für Large Scale Data Science .

13
maasg

Genau wie maasg sagt, dass Sie einen neuen DataFrame aus dem Ergebnis einer Zuordnung erstellen können, die auf den alten DataFrame angewendet wird. Ein Beispiel für einen bestimmten DataFrame df mit zwei Zeilen:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Beachten Sie, dass Sie bei einer Änderung der Spaltentypen anstelle von df.schema Ein korrektes Schema angeben müssen. Überprüfen Sie die API von org.Apache.spark.sql.Row Auf verfügbare Methoden: https://spark.Apache.org/docs/latest/api/Java/org/Apache/spark/sql/Row.html =

[Update] Oder mit UDFs in Scala:

import org.Apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

und wenn der Spaltenname gleich bleiben soll, können Sie ihn wieder umbenennen:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
11
radek1st