wake-up-neo.com

Wie bereiten Sie Daten aus DataFrame in ein LibSVM-Format vor?

Ich möchte das libsvm-Format erstellen, also habe ich Dataframe in das gewünschte Format gebracht, weiß aber nicht, wie ich das libsvm-Format konvertieren soll. Das Format ist wie in der Abbildung dargestellt. Ich hoffe, dass der gewünschte libsvm-Typ Benutzerelement ist: Bewertung . Wenn Sie wissen, was in der aktuellen Situation zu tun ist:

val ratings = sc.textFile(new File("/user/ubuntu/kang/0829/rawRatings.csv").toString).map { line =>
     val fields = line.split(",")
      (fields(0).toInt,fields(1).toInt,fields(2).toDouble)
}
val user = ratings.map{ case (user,product,rate) => (user,(product.toInt,rate.toDouble))}
val usergroup = user.groupByKey 

val data =usergroup.map{ case(x,iter) => (x,iter.map(_._1).toArray,iter.map(_._2).toArray)}

val data_DF = data.toDF("user","item","rating")

DATAFRAME FIGURE

Ich verwende Spark 2.0.

12
Data diaboli

Das Problem, mit dem Sie konfrontiert sind, kann in folgende Bereiche unterteilt werden:

  • Konvertieren Sie Ihre Bewertungen (glaube ich) in LabeledPoint data X.
  • Speichern von X im Format libsvm.

1. Konvertieren Sie Ihre Bewertungen in LabeledPoint Daten X

Betrachten wir die folgenden Rohbewertungen:

val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")

Sie können diese Rohbewertungen als Koordinatenlistenmatrix (COO) behandeln.

Spark implementiert eine verteilte Matrix, die durch eine RDD ihrer Einträge unterstützt wird: CoordinateMatrix, wobei jeder Eintrag ein Tuple von (i: Long, j: Long, Wert: Double) ist.

Hinweis: Eine CoordinateMatrix sollte nur verwendet werden, wenn beide Dimensionen der Matrix sehr groß sind und die Matrix sehr spärlich ist. (Normalerweise bei Benutzer-/Artikelbewertungen)

import org.Apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.Apache.spark.rdd.RDD

val data: RDD[MatrixEntry] = 
      sc.parallelize(rawRatings).map {
            line => {
                  val fields = line.split(",")
                  val i = fields(0).toLong
                  val j = fields(1).toLong
                  val value = fields(2).toDouble
                  MatrixEntry(i, j, value)
            }
      }

Nun konvertieren wir den RDD[MatrixEntry] in eine CoordinateMatrix und extrahieren die indizierten Zeilen:

val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
                .toIndexedRowMatrix().rows // Extract indexed rows
                .toDF("label", "features") // Convert rows

2. Speichern von LabeledPoint-Daten im libsvm-Format

Seit Spark 2.0 können Sie dies mit der DataFrameWriter tun. Lassen Sie uns ein kleines Beispiel mit einigen Dummy-LabeledPoint-Daten erstellen (Sie können auch die DataFrame verwenden, die wir zuvor erstellt haben):

import org.Apache.spark.mllib.linalg.Vectors
import org.Apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

val df = Seq(neg,pos).toDF("label","features")

Leider können wir die Variable DataFrameWriter immer noch nicht direkt verwenden, da die meisten Pipelinekomponenten Abwärtskompatibilität für das Laden unterstützen. Einige vorhandene DataFrames und Pipelines in Spark-Versionen vor 2.0, die Vektor- oder Matrixspalten enthalten, müssen möglicherweise auf den neuen Funken migriert werden. ml Vektor- und Matrixtypen. 

Dienstprogramme zum Konvertieren von DataFrame-Spalten von mllib.linalg in ml.linalg-Typen (und umgekehrt) finden Sie in org.Apache.spark.mllib.util.MLUtils. In unserem Fall müssen Sie Folgendes tun (sowohl für die Dummy-Daten als auch für die DataFrame von step 1.)

import org.Apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)

Jetzt speichern wir den DataFrame:

convertedVecDF.write.format("libsvm").save("data/foo")

Und wir können den Inhalt der Dateien überprüfen: 

$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0

EDIT: In der aktuellen Version von spark (2.1.0) muss kein mllib-Paket verwendet werden. Sie können einfach LabeledPoint-Daten im libsvm-Format wie folgt speichern:

import org.Apache.spark.ml.linalg.Vectors
import org.Apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")
14
eliasah

libsvm datatype features ist ein spärlicher Vektor. Sie können pyspark.ml.linalg.SparseVector verwenden, um das Problem zu lösen

a = SparseVector(4, [1, 3], [3.0, 4.0])

def sparsevecfuc(len,index,score):
    """
     args: len int, index array, score array
    """
    return SparseVector(len,index,score)
trans_sparse = udf(sparsevecfuc,VectorUDT())
0
MaxYu

Um ein vorhandenes in ein typisiertes DataSet zu konvertieren, schlage ich Folgendes vor: Verwenden Sie die folgende Fallklasse:

case class LibSvmEntry (
   value: Double,
   features: L.Vector)

Sie können die map-Funktion verwenden, um sie wie folgt in einen LibSVM-Eintrag zu konvertieren: df.map[LibSvmEntry](r: Row => /* Do your stuff here*/)

0
Elior Malul