wake-up-neo.com

Wie konvertiere ich eine Array (d. H. Liste) -Spalte in Vector

Kurzversion der Frage!

Betrachten Sie das folgende Snippet (unter der Annahme, dass spark bereits auf SparkSession gesetzt ist):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

Beachten Sie, dass das Temperaturfeld eine Liste von Schwimmern ist. Ich möchte diese Float-Listen in den MLlib-Typ Vector konvertieren und möchte, dass diese Konvertierung mit der grundlegenden DataFrame -API ausgeführt wird, anstatt über RDDs (was ineffizient ist, weil Es sendet alle Daten von der JVM an Python. Die Verarbeitung erfolgt in Python. Wir haben nicht die Vorteile des Catalyst-Optimierers von Spark (yada yada). Wie mache ich das? Speziell:

  1. Gibt es eine Möglichkeit, einen Straight Cast zum Laufen zu bringen? Weiter unten finden Sie Details (und einen fehlgeschlagenen Versuch zur Umgehung). Oder gibt es eine andere Operation, die den gewünschten Effekt hat?
  2. Welche der beiden unten vorgeschlagenen alternativen Lösungen ist effizienter (UDF vs. Auflösen/Zusammensetzen der Elemente in der Liste)? Oder gibt es andere, fast aber nicht ganz richtige Alternativen, die besser sind als beide?

Ein Straight Cast funktioniert nicht

Dies ist, was ich erwarten würde, die "richtige" Lösung zu sein. Ich möchte den Typ einer Spalte von einem Typ in einen anderen konvertieren, daher sollte ich eine Besetzung verwenden. Lassen Sie mich Sie ein wenig an den normalen Weg erinnern, es auf einen anderen Typ zu übertragen:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

Jetzt z. df_with_strings.collect()[0]["temperatures"][1] ist '-7.0'. Aber wenn ich auf einen ml Vector wirke, läuft es nicht so gut:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

Dies gibt einen Fehler:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

Huch! Irgendwelche Ideen, wie man das behebt?

Mögliche Alternativen

Alternative 1: Verwenden von VectorAssembler

Es gibt ein Transformer, das für diesen Job fast ideal erscheint: das VectorAssembler . Es nimmt eine oder mehrere Spalten und verkettet sie zu einem einzigen Vektor. Leider werden nur die Spalten Vector und Float verwendet, nicht die Spalten Array, daher funktioniert Folgendes nicht:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

Es gibt diesen Fehler:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

Die beste Lösung ist, die Liste in mehrere Spalten zu zerlegen und sie dann mit dem Befehl VectorAssembler wieder zu sichern:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

Dies scheint ideal zu sein, außer dass TEMPERATURE_COUNT Mehr als 100 und manchmal mehr als 1000 ist. (Ein weiteres Problem ist, dass der Code komplizierter wäre, wenn Sie die Größe des Arrays in nicht kennen Fortschritt, obwohl das bei meinen Daten nicht der Fall ist. Erzeugt Spark) tatsächlich einen Zwischendatensatz mit so vielen Spalten, oder betrachtet es dies nur als einen Zwischenschritt, den einzelne Elemente vorübergehend durchlaufen (oder optimiert es diesen Wegschritt tatsächlich vollständig, wenn es sieht, dass die einzige Verwendung dieser Spalten darin besteht, zu einem Vektor zusammengesetzt zu werden)?

Alternative 2: Verwenden Sie eine UDF

Eine einfachere Alternative ist die Verwendung einer UDF für die Konvertierung. Auf diese Weise kann ich direkt in einer Codezeile ausdrücken, was ich tun möchte, und es ist nicht erforderlich, einen Datensatz mit einer verrückten Anzahl von Spalten zu erstellen. Aber all diese Daten müssen zwischen Python und der JVM ausgetauscht werden, und jede einzelne Nummer muss von Python behandelt werden (was für das Wiederholen notorisch langsam ist) einzelne Datenelemente). So sieht das aus:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

Ignorante Bemerkungen

Die restlichen Abschnitte dieser wandernden Frage sind einige zusätzliche Dinge, die ich mir ausgedacht habe, als ich versucht habe, eine Antwort zu finden. Sie können wahrscheinlich von den meisten Lesern übersprungen werden.

Keine Lösung: Verwenden Sie zunächst Vector

In diesem einfachen Beispiel ist es möglich, die Daten zunächst mit dem Vektortyp zu erstellen, aber meine Daten sind natürlich nicht wirklich eine Python Liste, die ich parallelisiere, sondern sie werden gelesen Aber für den Datensatz ist hier, wie das aussehen würde:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

Ineffiziente Lösung: benutze map()

Eine Möglichkeit besteht darin, die Methode RDD map() zu verwenden, um die Liste in eine Vector umzuwandeln. Dies ähnelt der UDF-Idee, ist jedoch noch schlimmer, da die Kosten für die Serialisierung usw. für alle Felder in jeder Zeile anfallen, nicht nur für das zu bearbeitende. Für das Protokoll ist hier, wie diese Lösung aussehen würde:

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

Fehlgeschlagener Versuch einer Problemumgehung für die Besetzung

In meiner Verzweiflung bemerkte ich, dass Vector intern durch eine Struktur mit vier Feldern dargestellt wird, aber die Verwendung einer traditionellen Besetzung dieser Art von Struktur funktioniert auch nicht. Hier ist eine Illustration (wo ich die Struktur mit einem udf erstellt habe, aber das udf ist nicht der wichtige Teil):

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

Dies gibt den Fehler:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
51
Arthur Tacca

Persönlich würde ich mich für Python UDF entscheiden und mich um nichts anderes kümmern:

Aber wenn Sie hier wirklich andere Optionen wünschen, sind Sie:

  • Scala UDF mit Python wrapper:

    Installieren Sie sbt gemäß den Anweisungen auf der Projektsite.

    Erstelle Scala Paket mit folgender Struktur:

    .
    ├── build.sbt
    └── udfs.scala
    

    Bearbeiten build.sbt (Anpassen, um Scala und Spark Version) zu reflektieren):

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.Apache.spark" %% "spark-sql" % "2.1.0",
      "org.Apache.spark" %% "spark-mllib" % "2.1.0"
    )
    

    Bearbeiten udfs.scala:

    package com.example.spark.udfs
    
    import org.Apache.spark.sql.functions.udf
    import org.Apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }
    

    Paket:

    sbt package
    

    und include (oder gleichwertig je nach Scala vers:

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
    

    als Argument für --driver-class-path beim Starten von Shell/beim Einreichen der Anwendung.

    In PySpark definieren Sie einen Wrapper:

    from pyspark.sql.column import _to_Java_column, _to_seq, Column
    from pyspark import SparkContext
    
    def as_vector(col):
        sc = SparkContext.getOrCreate()
        f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
        return Column(f.apply(_to_seq(sc, [col], _to_Java_column)))
    

    Prüfung:

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
    
  • Kopieren Sie die Daten in ein JSON-Format, das das DenseVector-Schema widerspiegelt, und lesen Sie es zurück:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)
    
15
user6910411

Ich hatte das gleiche Problem wie Sie und habe es so gemacht. Diese Methode schließt die RDD-Transformation ein, ist also nicht leistungskritisch, funktioniert aber.

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.Zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])

new_df

das Ergebnis ist,

DataFrame[city: string, temperatures: vector]
2
GGDammy