wake-up-neo.com

Anwenden von UDFs auf GroupedData in PySpark (mit funktionierendem Python-Beispiel)

Ich habe diesen Python-Code, der lokal in einem Pandas-Datenrahmen ausgeführt wird:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(Zip(x.B, x.C), x.name))

Ich möchte dies in PySpark ausführen, habe aber Probleme mit dem pyspark.sql.group.GroupedData-Objekt.

Ich habe folgendes versucht:

sparkDF
 .groupby('A')
 .agg(myFunction(Zip('B', 'C'), 'A')) 

die zurückkehrt

KeyError: 'A'

Ich vermute, weil 'A' keine Spalte mehr ist und ich das Äquivalent für x.name nicht finden kann.

Und dann

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(Zip('B', 'C'), 'A'))) 
 .toDF()

aber erhalte den folgenden Fehler:

AttributeError: 'GroupedData' object has no attribute 'map'

Anregungen wären sehr dankbar!

19
arosner09

Sie möchten eine UDAF (User Defined Aggregate Function) im Gegensatz zu einer UDF (User Defined Function) schreiben. UDAFs sind Funktionen, die mit Daten arbeiten, die nach einem Schlüssel gruppiert sind. Insbesondere müssen sie definieren, wie mehrere Werte in der Gruppe in einer einzelnen Partition zusammengeführt werden sollen, und anschließend, wie die Ergebnisse für Schlüssel über Partitionen hinweg zusammengeführt werden. Derzeit gibt es in Python keine Möglichkeit, eine UDAF zu implementieren. Sie können nur in Scala implementiert werden. 

Sie können es jedoch in Python umgehen. Sie können den Collect-Set verwenden, um Ihre gruppierten Werte zu sammeln, und dann eine normale UDF verwenden, um mit ihnen zu tun, was Sie möchten. Der einzige Nachteil ist, dass collect_set nur bei primitiven Werten funktioniert. Daher müssen Sie sie in eine Zeichenfolge kodieren.

from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf

def myFunc(data_list):
    for val in data_list:
        b, c = data.split(',')
        # do something

    return <whatever>

myUdf = udf(myFunc, StringType())

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
  .groupBy('A').agg(collect_list('data').alias('data'))
  .withColumn('data', myUdf('data'))

Verwenden Sie collect_set, wenn Sie die Abrechnung durchführen möchten. Wenn Sie für einige Ihrer Schlüssel viele Werte haben, ist dies langsam, da alle Werte für einen Schlüssel in einer einzigen Partition irgendwo in Ihrem Cluster gesammelt werden müssen. Wenn Ihr Endergebnis ein Wert ist, den Sie erstellen, indem Sie die Werte pro Schlüssel auf irgendeine Weise kombinieren (z. B. indem Sie sie summieren), ist es möglicherweise schneller, ihn mit der Methode RDD aggregateByKey zu implementieren, mit der Sie einen Zwischenwert für jeden Schlüssel erstellen können in einer Partition vor dem Mischen von Daten.

EDIT: 21.11.18

Da diese Antwort geschrieben wurde, fügte pyspark hinzu, dass UDAFs Pandas verwenden. Bei der Verwendung der UDFs und UDAFs des Panda gegenüber reinen Python-Funktionen mit RDDs gibt es einige Leistungsverbesserungen von Nice. Unter der Haube vektorisiert es die Spalten (stapelt die Werte aus mehreren Zeilen zusammen, um die Verarbeitung und Komprimierung zu optimieren). Sehen Sie sich here an, um eine bessere Erklärung zu erhalten, oder werfen Sie einen Blick auf user6910411 .

31
Ryan Widmaier

Seit Spark 2.3 können Sie pandas_udf verwenden. GROUPED_MAP nimmt Callable[[pandas.DataFrame], pandas.DataFrame] oder mit anderen Worten eine Funktion, die von Pandas DataFrame derselben Form wie die Eingabe auf die Ausgabe DataFrame abbildet.

Zum Beispiel, wenn Daten so aussehen:

df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
    ("key", "value1", "value2")
)

und Sie möchten den Durchschnittswert von paarweise min zwischen value1value2 berechnen, müssen Sie das Ausgabeschema definieren:

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

pandas_udf:

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    result = pd.DataFrame(df.groupby(df.key).apply(
        lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
    ))
    result.reset_index(inplace=True, drop=False)
    return result

und wende es an:

df.groupby("key").apply(g).show()
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+

Ihr aktueller Pandas-Code kann ohne Schemadefinition und Dekorator unverändert übernommen werden.

Seit Spark 2.4.0 gibt es auch GROUPED_AGG variant, was Callable[[pandas.Series, ...], T] übernimmt, wobei T ein primitiver Skalar ist:

import numpy as np

@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def f(x, y):
    return np.minimum(x, y).mean()

die mit Standard group_by/agg verwendet werden kann:

df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+

Bitte beachten Sie, dass sich weder GROUPED_MAP noch GROUPPED_AGGpandas_udf genauso verhalten wie UserDefinedAggregateFunction oder Aggregator, und es ist näher an groupByKey- oder Fensterfunktionen mit nicht gebundenem Frame. Die Daten werden zuerst gemischt und erst danach wird die UDF angewendet.

Für eine optimale Ausführung sollten Sie Scala UserDefinedAggregateFunction und Python-Wrapper hinzufügen.

Siehe auch Benutzerdefinierte Funktion, die in PySpark auf Window angewendet werden soll?

21
user6910411

Ich werde die Antwort weiter ausdehnen.

Sie können also dieselbe Logik wie pandas.groupby () implementieren. In pyspark mit @pandas_udf .__ anwenden. Diese Methode ist Vektorisierungsmethode und schneller als einfacher udf.

from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

df3.groupby("key").apply(g).show()

Sie erhalten unter dem Ergebnis:

+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
|  b|       6.5|      -1.5|    5.0|    8.0|
|  a|       0.0|      21.0|   21.0|  -21.0|
+---+----------+----------+-------+-------+

Sie können also mehr Berechnungen zwischen anderen Feldern in gruppierten Daten durchführen und diese im Listenformat zu einem Datenrahmen hinzufügen.

2
Mayur Dangar