Ich habe einen Spark DataFrame (mit PySpark 1.5.1) und möchte eine neue Spalte hinzufügen.
Ich habe folgendes ohne Erfolg versucht:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
Auch hier ist ein Fehler aufgetreten:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
Wie füge ich also mit PySpark eine neue Spalte (basierend auf dem Python -Vektor) zu einem vorhandenen DataFrame hinzu?
Sie können einer DataFrame
in Spark keine beliebige Spalte hinzufügen. Neue Spalten können nur mithilfe von Literalen erstellt werden (andere Literaltypen werden unter beschrieben. So fügen Sie eine konstante Spalte in einen Spark -Datenrahmen ein? )
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
eine vorhandene Spalte transformieren:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
eingeschlossen mit join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
oder generiert mit function/udf:
from pyspark.sql.functions import Rand
df_with_x7 = df_with_x6.withColumn("x7", Rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
Leistungsmäßig integrierte Funktionen (pyspark.sql.functions
), die dem Catalyst-Ausdruck zugeordnet sind, werden normalerweise den benutzerdefinierten Funktionen Python vorgezogen.
Wenn Sie den Inhalt einer beliebigen RDD als Spalte hinzufügen möchten, können Sie dies tun
zipWithIndex
auf und konvertieren Sie es in einen DatenrahmenSo fügen Sie eine Spalte mithilfe einer UDF hinzu:
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
Elif value == 2: return 'cat2'
...
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
Für Spark 2.
# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))
Ich möchte ein verallgemeinertes Beispiel für einen sehr ähnlichen Anwendungsfall anbieten:
Anwendungsfall: Ich habe eine CSV bestehend aus:
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
Ich muss einige Transformationen durchführen und die endgültige CSV muss so aussehen
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
Ich muss dies tun, da dies das Schema ist, das von einem Modell definiert wird, und ich muss meine endgültigen Daten mit SQL Bulk Inserts und solchen Dingen interoperabel sein.
damit:
1) Ich habe die Original-CSV mit spark.read gelesen und nenne sie "df".
2) Ich mache etwas mit den Daten.
3) Ich füge die Nullspalten mit diesem Skript hinzu:
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
df = df.select(outcols)
Auf diese Weise können Sie Ihr Schema nach dem Laden einer CSV-Datei strukturieren (funktioniert auch beim Neuanordnen von Spalten, wenn dies für viele Tabellen erforderlich ist).
Mit den folgenden Schritten können wir zusätzliche Spalten direkt zu DataFrame hinzufügen:
from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
Die einfachste Möglichkeit zum Hinzufügen einer Spalte ist die Verwendung von "withColumn". Da der Datenrahmen mit sqlContext erstellt wird, müssen Sie das Schema angeben, oder er kann standardmäßig im Dataset verfügbar sein. Wenn das Schema angegeben wird, wird die Arbeitslast bei jeder Änderung langwierig.
Nachfolgend finden Sie ein Beispiel, das Sie berücksichtigen können:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default
# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")
# Check the change
Data.printSchema()