Ich möchte eine Spalte in einer DataFrame
mit einem beliebigen Wert hinzufügen (der für jede Zeile gleich ist). Ich erhalte eine Fehlermeldung, wenn ich withColumn
wie folgt benutze:
dt.withColumn('new_column', 10).head(5)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
1 dt = (messages
2 .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)
/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
1166 [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
1167 """
-> 1168 return self.select('*', col.alias(colName))
1169
1170 @ignore_unicode_prefix
AttributeError: 'int' object has no attribute 'alias'
Es scheint, dass ich die Funktion dazu bringen kann, nach Belieben zu arbeiten, indem ich eine der anderen Spalten hinzufüge und von ihr subtrahiere (also zu Null addiere) und dann die gewünschte Zahl hinzufüge (10 in diesem Fall):
dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]
Das ist extrem hackig, richtig? Ich gehe davon aus, dass es einen legitimeren Weg gibt, dies zu tun?
Spark 2.2+
Spark 2.2 führt typedLit
zur Unterstützung von Seq
, Map
und Tuples
( SPARK-19254 ) ein. Folgende Aufrufe sollten unterstützt werden (Scala):
import org.Apache.spark.sql.functions.typedLit
df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, .0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))
Spark 1.3+ (lit
), 1.4+ (array
, struct
), 2.0+ (map
):
Das zweite Argument für DataFrame.withColumn
sollte eine Column
sein, daher müssen Sie ein Literal verwenden:
from pyspark.sql.functions import lit
df.withColumn('new_column', lit(10))
Wenn Sie komplexe Spalten benötigen, können Sie diese mit Hilfe von Blöcken wie array
erstellen:
from pyspark.sql.functions import array, create_map, struct
df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))
In Scala können genau die gleichen Methoden verwendet werden.
import org.Apache.spark.sql.functions.{array, lit, map, struct}
df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))
Um Namen für structs
anzugeben, verwenden Sie entweder alias
für jedes Feld:
df.withColumn(
"some_struct",
struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
)
oder cast
für das gesamte Objekt
df.withColumn(
"some_struct",
struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
)
Es ist auch möglich, wenn auch langsamer, eine UDF zu verwenden.
Hinweis:
Die gleichen Konstrukte können verwendet werden, um konstante Argumente an UDFs oder SQL-Funktionen zu übergeben.
In Spark 2.2 gibt es zwei Möglichkeiten, einen konstanten Wert in einer Spalte in DataFrame hinzuzufügen:
1) Verwenden von lit
2) Verwendung von typedLit
.
Der Unterschied zwischen den beiden ist, dass typedLit
auch parametrisierte Scala-Typen handhaben kann, z. List, Seq und Map
Beispiel-Datenrahmen:
val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1")
+---+----+
| id|col1|
+---+----+
| 0| a|
| 1| b|
+---+----+
1) Verwenden von lit
: Hinzufügen eines konstanten Zeichenfolgewerts in der neuen Spalte newcol:
import org.Apache.spark.sql.functions.lit
val newdf = df.withColumn("newcol",lit("myval"))
Ergebnis:
+---+----+------+
| id|col1|newcol|
+---+----+------+
| 0| a| myval|
| 1| b| myval|
+---+----+------+
2) Verwendung von typedLit
:
import org.Apache.spark.sql.functions.typedLit
df.withColumn("newcol", typedLit(("sample", 10, .044)))
Ergebnis:
+---+----+-----------------+
| id|col1| newcol|
+---+----+-----------------+
| 0| a|[sample,10,0.044]|
| 1| b|[sample,10,0.044]|
| 2| c|[sample,10,0.044]|
+---+----+-----------------+