wake-up-neo.com

Wie füge ich eine neue Spalte zu einem Spark DataFrame hinzu (mit PySpark)?

Ich habe einen Spark DataFrame (mit PySpark 1.5.1) und möchte eine neue Spalte hinzufügen.

Ich habe folgendes ohne Erfolg versucht:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

Auch hier ist ein Fehler aufgetreten:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

Wie füge ich also mit PySpark eine neue Spalte (basierend auf dem Python -Vektor) zu einem vorhandenen DataFrame hinzu?

105
Boris

Sie können einer DataFrame in Spark keine beliebige Spalte hinzufügen. Neue Spalten können nur mithilfe von Literalen erstellt werden (andere Literaltypen werden unter beschrieben. So fügen Sie eine konstante Spalte in einen Spark -Datenrahmen ein? )

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

eine vorhandene Spalte transformieren:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

eingeschlossen mit join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

oder generiert mit function/udf:

from pyspark.sql.functions import Rand

df_with_x7 = df_with_x6.withColumn("x7", Rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

Leistungsmäßig integrierte Funktionen (pyspark.sql.functions), die dem Catalyst-Ausdruck zugeordnet sind, werden normalerweise den benutzerdefinierten Funktionen Python vorgezogen.

Wenn Sie den Inhalt einer beliebigen RDD als Spalte hinzufügen möchten, können Sie dies tun

165
zero323

So fügen Sie eine Spalte mithilfe einer UDF hinzu:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   Elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+
53
Mark Rajcok

Für Spark 2.

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))
25
Luke W

Ich möchte ein verallgemeinertes Beispiel für einen sehr ähnlichen Anwendungsfall anbieten:

Anwendungsfall: Ich habe eine CSV bestehend aus:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

Ich muss einige Transformationen durchführen und die endgültige CSV muss so aussehen

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

Ich muss dies tun, da dies das Schema ist, das von einem Modell definiert wird, und ich muss meine endgültigen Daten mit SQL Bulk Inserts und solchen Dingen interoperabel sein.

damit:

1) Ich habe die Original-CSV mit spark.read gelesen und nenne sie "df".

2) Ich mache etwas mit den Daten.

3) Ich füge die Nullspalten mit diesem Skript hinzu:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

Auf diese Weise können Sie Ihr Schema nach dem Laden einer CSV-Datei strukturieren (funktioniert auch beim Neuanordnen von Spalten, wenn dies für viele Tabellen erforderlich ist).

0
bloodrootfc

Mit den folgenden Schritten können wir zusätzliche Spalten direkt zu DataFrame hinzufügen:

from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
0
yogesh

Die einfachste Möglichkeit zum Hinzufügen einer Spalte ist die Verwendung von "withColumn". Da der Datenrahmen mit sqlContext erstellt wird, müssen Sie das Schema angeben, oder er kann standardmäßig im Dataset verfügbar sein. Wenn das Schema angegeben wird, wird die Arbeitslast bei jeder Änderung langwierig.

Nachfolgend finden Sie ein Beispiel, das Sie berücksichtigen können:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default 

# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")

# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")

# Check the change 
Data.printSchema()