wake-up-neo.com

Wie konvertiere ich eine RDD mit einer SparseVector-Spalte in einen DataFrame mit einer Spalte als Vektor

Ich habe einRDDmit einem Tupel von Werten (String, SparseVector) und ich möchte ein DataFrame mit demRDDerstellen. Um ein (label: string, features: vector) DataFrame zu erhalten, benötigen die meisten Bibliotheken des ml-Algorithmus ein Schema. Ich weiß, dass dies möglich ist, weil HashingTF ml Library einen Vektor ausgibt, wenn eine Features-Spalte eines DataFrame angegeben wird.

temp_df = sqlContext.createDataFrame(temp_rdd, StructType([
        StructField("label", DoubleType(), False),
        StructField("tokens", ArrayType(StringType()), False)
    ]))

#assumming there is an RDD (double,array(strings))

hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features")

ndf = hashingTF.transform(temp_df)
ndf.printSchema()

#outputs 
#root
#|-- label: double (nullable = false)
#|-- tokens: array (nullable = false)
#|    |-- element: string (containsNull = true)
#|-- features: vector (nullable = true)

Meine Frage ist also, kann ich irgendwie einRDDvon (String, SparseVector) in ein DataFrame von (String, Vektor) konvertieren lassen. Ich habe es mit dem üblichen sqlContext.createDataFrame versucht, aber es gibt keinen DataType , der meinen Bedürfnissen entspricht.

df = sqlContext.createDataFrame(rdd,StructType([
        StructField("label" , StringType(),True),
        StructField("features" , ?Type(),True)
    ]))
13
Orangel Marquez

Sie müssen hier VectorUDT verwenden:

# In Spark 1.x
# from pyspark.mllib.linalg import SparseVector, VectorUDT
from pyspark.ml.linalg import SparseVector, VectorUDT

temp_rdd = sc.parallelize([
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])

schema = StructType([
    StructField("label", DoubleType(), True),
    StructField("features", VectorUDT(), True)
])

temp_rdd.toDF(schema).printSchema()

## root
##  |-- label: double (nullable = true)
##  |-- features: vector (nullable = true)

Nur der Vollständigkeit halber entspricht Scala:

import org.Apache.spark.sql.Row
import org.Apache.spark.rdd.RDD
import org.Apache.spark.sql.types.{DoubleType, StructType}
// In Spark 1x.
// import org.Apache.spark.mllib.linalg.{Vectors, VectorUDT}
import org.Apache.spark.ml.linalg.Vectors
import org.Apache.spark.ml.linalg.SQLDataTypes.VectorType

val schema = new StructType()
  .add("label", DoubleType)
   // In Spark 1.x
   //.add("features", new VectorUDT())
  .add("features",VectorType)

val temp_rdd: RDD[Row]  = sc.parallelize(Seq(
  Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))),
  Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5))))
))

spark.createDataFrame(temp_rdd, schema).printSchema

// root
// |-- label: double (nullable = true)
// |-- features: vector (nullable = true)
19
zero323

Während @ Zero323 Antwort https://stackoverflow.com/a/32745924/1333621 Sinn macht, und ich wünschte, es funktioniert für mich - die Rdd unter dem DataFrame, sqlContext.createDataFrame (temp_rdd, Schema), die noch enthalten SparseVectors-Typen Ich musste Folgendes tun, um in DenseVector-Typen zu konvertieren - wenn jemand einen kürzeren/besseren Weg hat, möchte ich wissen

temp_rdd = sc.parallelize([
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])

schema = StructType([
    StructField("label", DoubleType(), True),
    StructField("features", VectorUDT(), True)
])

temp_rdd.toDF(schema).printSchema()
df_w_ftr = temp_rdd.toDF(schema)

print 'original convertion method: ',df_w_ftr.take(5)
print('\n')
temp_rdd_dense = temp_rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray())))
print type(temp_rdd_dense), type(temp_rdd)
print 'using map and toArray:', temp_rdd_dense.take(5)

temp_rdd_dense.toDF().show()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

original convertion method:  [Row(label=0.0, features=SparseVector(4, {1: 1.0, 3: 5.5})), Row(label=1.0, features=SparseVector(4, {0: -1.0, 2: 0.5}))]


<class 'pyspark.rdd.PipelinedRDD'> <class 'pyspark.rdd.RDD'>
using map and toArray: [Row(features=DenseVector([0.0, 1.0, 0.0, 5.5]), label=0.0), Row(features=DenseVector([-1.0, 0.0, 0.5, 0.0]), label=1.0)]

+------------------+-----+
|          features|label|
+------------------+-----+
| [0.0,1.0,0.0,5.5]|  0.0|
|[-1.0,0.0,0.5,0.0]|  1.0|
+------------------+-----+
4
meyerson

dies ist ein Beispiel in Scala für Spark 2.1

import org.Apache.spark.ml.linalg.Vector

def featuresRDD2DataFrame(features: RDD[Vector]): DataFrame = {
    import sparkSession.implicits._
    val rdd: RDD[(Double, Vector)] = features.map(x => (0.0, x))
    val df = rdd.toDF("label","features").select("features")
    df
  }

die toDF() wurde vom Compiler auf den Features rdd nicht erkannt

1
cipri.l