Ich habe diesen Python-Code, der lokal in einem Pandas-Datenrahmen ausgeführt wird:
df_result = pd.DataFrame(df
.groupby('A')
.apply(lambda x: myFunction(Zip(x.B, x.C), x.name))
Ich möchte dies in PySpark ausführen, habe aber Probleme mit dem pyspark.sql.group.GroupedData-Objekt.
Ich habe folgendes versucht:
sparkDF
.groupby('A')
.agg(myFunction(Zip('B', 'C'), 'A'))
die zurückkehrt
KeyError: 'A'
Ich vermute, weil 'A' keine Spalte mehr ist und ich das Äquivalent für x.name nicht finden kann.
Und dann
sparkDF
.groupby('A')
.map(lambda row: Row(myFunction(Zip('B', 'C'), 'A')))
.toDF()
aber erhalte den folgenden Fehler:
AttributeError: 'GroupedData' object has no attribute 'map'
Anregungen wären sehr dankbar!
Sie möchten eine UDAF (User Defined Aggregate Function) im Gegensatz zu einer UDF (User Defined Function) schreiben. UDAFs sind Funktionen, die mit Daten arbeiten, die nach einem Schlüssel gruppiert sind. Insbesondere müssen sie definieren, wie mehrere Werte in der Gruppe in einer einzelnen Partition zusammengeführt werden sollen, und anschließend, wie die Ergebnisse für Schlüssel über Partitionen hinweg zusammengeführt werden. Derzeit gibt es in Python keine Möglichkeit, eine UDAF zu implementieren. Sie können nur in Scala implementiert werden.
Sie können es jedoch in Python umgehen. Sie können den Collect-Set verwenden, um Ihre gruppierten Werte zu sammeln, und dann eine normale UDF verwenden, um mit ihnen zu tun, was Sie möchten. Der einzige Nachteil ist, dass collect_set nur bei primitiven Werten funktioniert. Daher müssen Sie sie in eine Zeichenfolge kodieren.
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf
def myFunc(data_list):
for val in data_list:
b, c = data.split(',')
# do something
return <whatever>
myUdf = udf(myFunc, StringType())
df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
.groupBy('A').agg(collect_list('data').alias('data'))
.withColumn('data', myUdf('data'))
Verwenden Sie collect_set, wenn Sie die Abrechnung durchführen möchten. Wenn Sie für einige Ihrer Schlüssel viele Werte haben, ist dies langsam, da alle Werte für einen Schlüssel in einer einzigen Partition irgendwo in Ihrem Cluster gesammelt werden müssen. Wenn Ihr Endergebnis ein Wert ist, den Sie erstellen, indem Sie die Werte pro Schlüssel auf irgendeine Weise kombinieren (z. B. indem Sie sie summieren), ist es möglicherweise schneller, ihn mit der Methode RDD aggregateByKey zu implementieren, mit der Sie einen Zwischenwert für jeden Schlüssel erstellen können in einer Partition vor dem Mischen von Daten.
EDIT: 21.11.18
Da diese Antwort geschrieben wurde, fügte pyspark hinzu, dass UDAFs Pandas verwenden. Bei der Verwendung der UDFs und UDAFs des Panda gegenüber reinen Python-Funktionen mit RDDs gibt es einige Leistungsverbesserungen von Nice. Unter der Haube vektorisiert es die Spalten (stapelt die Werte aus mehreren Zeilen zusammen, um die Verarbeitung und Komprimierung zu optimieren). Sehen Sie sich here an, um eine bessere Erklärung zu erhalten, oder werfen Sie einen Blick auf user6910411 .
Seit Spark 2.3 können Sie pandas_udf
verwenden. GROUPED_MAP
nimmt Callable[[pandas.DataFrame], pandas.DataFrame]
oder mit anderen Worten eine Funktion, die von Pandas DataFrame
derselben Form wie die Eingabe auf die Ausgabe DataFrame
abbildet.
Zum Beispiel, wenn Daten so aussehen:
df = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)
und Sie möchten den Durchschnittswert von paarweise min zwischen value1
value2
berechnen, müssen Sie das Ausgabeschema definieren:
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_min", DoubleType())
])
pandas_udf
:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
result = pd.DataFrame(df.groupby(df.key).apply(
lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
))
result.reset_index(inplace=True, drop=False)
return result
und wende es an:
df.groupby("key").apply(g).show()
+---+-------+
|key|avg_min|
+---+-------+
| b| -1.5|
| a| -0.5|
+---+-------+
Ihr aktueller Pandas-Code kann ohne Schemadefinition und Dekorator unverändert übernommen werden.
Seit Spark 2.4.0 gibt es auch GROUPED_AGG
variant, was Callable[[pandas.Series, ...], T]
übernimmt, wobei T
ein primitiver Skalar ist:
import numpy as np
@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def f(x, y):
return np.minimum(x, y).mean()
die mit Standard group_by
/agg
verwendet werden kann:
df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()
+---+-------+
|key|avg_min|
+---+-------+
| b| -1.5|
| a| -0.5|
+---+-------+
Bitte beachten Sie, dass sich weder GROUPED_MAP
noch GROUPPED_AGG
pandas_udf
genauso verhalten wie UserDefinedAggregateFunction
oder Aggregator
, und es ist näher an groupByKey
- oder Fensterfunktionen mit nicht gebundenem Frame. Die Daten werden zuerst gemischt und erst danach wird die UDF angewendet.
Für eine optimale Ausführung sollten Sie Scala UserDefinedAggregateFunction
und Python-Wrapper hinzufügen.
Siehe auch Benutzerdefinierte Funktion, die in PySpark auf Window angewendet werden soll?
Ich werde die Antwort weiter ausdehnen.
Sie können also dieselbe Logik wie pandas.groupby () implementieren. In pyspark mit @pandas_udf .__ anwenden. Diese Methode ist Vektorisierungsmethode und schneller als einfacher udf.
from pyspark.sql.functions import pandas_udf,PandasUDFType
df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_value1", DoubleType()),
StructField("avg_value2", DoubleType()),
StructField("sum_avg", DoubleType()),
StructField("sub_avg", DoubleType())
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])
df3.groupby("key").apply(g).show()
Sie erhalten unter dem Ergebnis:
+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
| b| 6.5| -1.5| 5.0| 8.0|
| a| 0.0| 21.0| 21.0| -21.0|
+---+----------+----------+-------+-------+
Sie können also mehr Berechnungen zwischen anderen Feldern in gruppierten Daten durchführen und diese im Listenformat zu einem Datenrahmen hinzufügen.