Betrachten Sie das folgende Snippet (unter der Annahme, dass spark
bereits auf SparkSession
gesetzt ist):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Beachten Sie, dass das Temperaturfeld eine Liste von Schwimmern ist. Ich möchte diese Float-Listen in den MLlib-Typ Vector
konvertieren und möchte, dass diese Konvertierung mit der grundlegenden DataFrame
-API ausgeführt wird, anstatt über RDDs (was ineffizient ist, weil Es sendet alle Daten von der JVM an Python. Die Verarbeitung erfolgt in Python. Wir haben nicht die Vorteile des Catalyst-Optimierers von Spark (yada yada). Wie mache ich das? Speziell:
Dies ist, was ich erwarten würde, die "richtige" Lösung zu sein. Ich möchte den Typ einer Spalte von einem Typ in einen anderen konvertieren, daher sollte ich eine Besetzung verwenden. Lassen Sie mich Sie ein wenig an den normalen Weg erinnern, es auf einen anderen Typ zu übertragen:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Jetzt z. df_with_strings.collect()[0]["temperatures"][1]
ist '-7.0'
. Aber wenn ich auf einen ml Vector wirke, läuft es nicht so gut:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Dies gibt einen Fehler:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
Huch! Irgendwelche Ideen, wie man das behebt?
VectorAssembler
Es gibt ein Transformer
, das für diesen Job fast ideal erscheint: das VectorAssembler
. Es nimmt eine oder mehrere Spalten und verkettet sie zu einem einzigen Vektor. Leider werden nur die Spalten Vector
und Float
verwendet, nicht die Spalten Array
, daher funktioniert Folgendes nicht:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
Es gibt diesen Fehler:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
Die beste Lösung ist, die Liste in mehrere Spalten zu zerlegen und sie dann mit dem Befehl VectorAssembler
wieder zu sichern:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
Dies scheint ideal zu sein, außer dass TEMPERATURE_COUNT
Mehr als 100 und manchmal mehr als 1000 ist. (Ein weiteres Problem ist, dass der Code komplizierter wäre, wenn Sie die Größe des Arrays in nicht kennen Fortschritt, obwohl das bei meinen Daten nicht der Fall ist. Erzeugt Spark) tatsächlich einen Zwischendatensatz mit so vielen Spalten, oder betrachtet es dies nur als einen Zwischenschritt, den einzelne Elemente vorübergehend durchlaufen (oder optimiert es diesen Wegschritt tatsächlich vollständig, wenn es sieht, dass die einzige Verwendung dieser Spalten darin besteht, zu einem Vektor zusammengesetzt zu werden)?
Eine einfachere Alternative ist die Verwendung einer UDF für die Konvertierung. Auf diese Weise kann ich direkt in einer Codezeile ausdrücken, was ich tun möchte, und es ist nicht erforderlich, einen Datensatz mit einer verrückten Anzahl von Spalten zu erstellen. Aber all diese Daten müssen zwischen Python und der JVM ausgetauscht werden, und jede einzelne Nummer muss von Python behandelt werden (was für das Wiederholen notorisch langsam ist) einzelne Datenelemente). So sieht das aus:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
Die restlichen Abschnitte dieser wandernden Frage sind einige zusätzliche Dinge, die ich mir ausgedacht habe, als ich versucht habe, eine Antwort zu finden. Sie können wahrscheinlich von den meisten Lesern übersprungen werden.
Vector
In diesem einfachen Beispiel ist es möglich, die Daten zunächst mit dem Vektortyp zu erstellen, aber meine Daten sind natürlich nicht wirklich eine Python Liste, die ich parallelisiere, sondern sie werden gelesen Aber für den Datensatz ist hier, wie das aussehen würde:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
map()
Eine Möglichkeit besteht darin, die Methode RDD map()
zu verwenden, um die Liste in eine Vector
umzuwandeln. Dies ähnelt der UDF-Idee, ist jedoch noch schlimmer, da die Kosten für die Serialisierung usw. für alle Felder in jeder Zeile anfallen, nicht nur für das zu bearbeitende. Für das Protokoll ist hier, wie diese Lösung aussehen würde:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
In meiner Verzweiflung bemerkte ich, dass Vector
intern durch eine Struktur mit vier Feldern dargestellt wird, aber die Verwendung einer traditionellen Besetzung dieser Art von Struktur funktioniert auch nicht. Hier ist eine Illustration (wo ich die Struktur mit einem udf erstellt habe, aber das udf ist nicht der wichtige Teil):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
Dies gibt den Fehler:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
Persönlich würde ich mich für Python UDF entscheiden und mich um nichts anderes kümmern:
Vectors
sind keine systemeigenen SQL-Typen, daher entsteht in der einen oder anderen Weise ein Performance-Overhead. Insbesondere erfordert dieser Prozess zwei Schritte, bei denen die Daten zuerst von externem Typ in Zeile konvertiert und dann von Zeile in interne Darstellung mit generischem RowEncoder
.Pipeline
ist viel teurer als eine einfache Konvertierung. Darüber hinaus erfordert es ein Verfahren, das dem oben beschriebenen entgegengesetzt istAber wenn Sie hier wirklich andere Optionen wünschen, sind Sie:
Scala UDF mit Python wrapper:
Installieren Sie sbt gemäß den Anweisungen auf der Projektsite.
Erstelle Scala Paket mit folgender Struktur:
.
├── build.sbt
└── udfs.scala
Bearbeiten build.sbt
(Anpassen, um Scala und Spark Version) zu reflektieren):
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.Apache.spark" %% "spark-sql" % "2.1.0",
"org.Apache.spark" %% "spark-mllib" % "2.1.0"
)
Bearbeiten udfs.scala
:
package com.example.spark.udfs
import org.Apache.spark.sql.functions.udf
import org.Apache.spark.ml.linalg.DenseVector
object udfs {
val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
}
Paket:
sbt package
und include (oder gleichwertig je nach Scala vers:
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
als Argument für --driver-class-path
beim Starten von Shell/beim Einreichen der Anwendung.
In PySpark definieren Sie einen Wrapper:
from pyspark.sql.column import _to_Java_column, _to_seq, Column
from pyspark import SparkContext
def as_vector(col):
sc = SparkContext.getOrCreate()
f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
return Column(f.apply(_to_seq(sc, [col], _to_Java_column)))
Prüfung:
with_vec = df.withColumn("vector", as_vector("temperatures"))
with_vec.show()
+--------+------------------+----------------+
| city| temperatures| vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+
with_vec.printSchema()
root
|-- city: string (nullable = true)
|-- temperatures: array (nullable = true)
| |-- element: double (containsNull = true)
|-- vector: vector (nullable = true)
Kopieren Sie die Daten in ein JSON-Format, das das DenseVector
-Schema widerspiegelt, und lesen Sie es zurück:
from pyspark.sql.functions import to_json, from_json, col, struct, lit
from pyspark.sql.types import StructType, StructField
from pyspark.ml.linalg import VectorUDT
json_vec = to_json(struct(struct(
lit(1).alias("type"), # type 1 is dense, type 0 is sparse
col("temperatures").alias("values")
).alias("v")))
schema = StructType([StructField("v", VectorUDT())])
with_parsed_vector = df.withColumn(
"parsed_vector", from_json(json_vec, schema).getItem("v")
)
with_parsed_vector.show()
+--------+------------------+----------------+
| city| temperatures| parsed_vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+
with_parsed_vector.printSchema()
root
|-- city: string (nullable = true)
|-- temperatures: array (nullable = true)
| |-- element: double (containsNull = true)
|-- parsed_vector: vector (nullable = true)
Ich hatte das gleiche Problem wie Sie und habe es so gemacht. Diese Methode schließt die RDD-Transformation ein, ist also nicht leistungskritisch, funktioniert aber.
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.Zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])
new_df
das Ergebnis ist,
DataFrame[city: string, temperatures: vector]