wake-up-neo.com

Spark-Fehler: erwartete Null-Argumente für die Erstellung von ClassDict (für numpy.core.multiarray._reconstruct)

Ich habe ein Datenframe in Spark, in dem eine der Spalten ein Array enthält. Nun habe ich eine separate UDF geschrieben, die das Array in ein anderes Array mit bestimmten Werten konvertiert. Siehe Beispiel unten:

Bsp: [24,23,27,23] sollte in [24, 23, 27] konvertiert werden. Code:

def uniq_array(col_array):
    x = np.unique(col_array)
    return x
uniq_array_udf = udf(uniq_array,ArrayType(IntegerType()))

Df3 = Df2.withColumn("age_array_unique",uniq_array_udf(Df2.age_array))

Im obigen Code ist Df2.age_array das Array, auf das ich die UDF anwende, um eine andere Spalte "age_array_unique" zu erhalten, die nur eindeutige Werte im Array enthalten sollte.

Sobald ich jedoch den Befehl Df3.show() starte, erhalte ich die Fehlermeldung:

net.razorvine.pickle.PickleException: erwartete Null-Argumente für die Erstellung von ClassDict (für numpy.core.multiarray._reconstruct)

Kann mir bitte jemand sagen, warum das so ist?

Vielen Dank!

21
Preyas

Die Ursache des Problems besteht darin, dass das von der UDF zurückgegebene Objekt nicht dem deklarierten Typ entspricht. np.unique gibt nicht nur numpy.ndarray zurück, sondern konvertiert auch Zahlen in die entsprechenden NumPy-Typen die nicht kompatibel sind mit DataFrame-API. Sie können so etwas versuchen:

udf(lambda x: list(set(x)), ArrayType(IntegerType()))

oder dies (um Ordnung zu halten)

udf(lambda xs: list(OrderedDict((x, None) for x in xs)), 
    ArrayType(IntegerType()))

stattdessen.

Wenn Sie wirklich np.unique wollen, müssen Sie die Ausgabe konvertieren:

udf(lambda x: np.unique(x).tolist(), ArrayType(IntegerType()))
27
zero323

Sie müssen den endgültigen Wert in eine Python-Liste konvertieren. Sie implementieren die Funktion wie folgt: 

def uniq_array(col_array):
    x = np.unique(col_array)
    return list(x)

Dies liegt daran, dass Spark das Numpy-Array-Format nicht versteht. Um ein Python-Objekt zu speisen, das Spark DataFrames als ArrayType versteht, müssen Sie die Ausgabe in ein Python list konvertieren, bevor Sie sie zurückgeben.

1
user1632287

Ab Pyspark Version 2.4 können Sie die array_distinct-Transformation verwenden.
http://spark.Apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.array_distinct

0
Crow59