wake-up-neo.com

Sollten wir einen DataFrame so parallelisieren, wie wir einen Seq vor dem Training parallelisieren?

Betrachten Sie den hier angegebenen Code,

https://spark.Apache.org/docs/1.2.0/ml-guide.html

import org.Apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)

val model1 = lr.fit(training)

Angenommen, wir lesen "training" als Datenframe mit sqlContext.read (), sollten wir trotzdem so etwas tun

val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this

oder die Fit-Funktion kümmert sich automatisch um die Parallelisierung der Berechnung/Daten, wenn ein DataFrame übergeben wird

Grüße,

5
Abhishek

DataFrame ist eine verteilte Datenstruktur. Es ist weder erforderlich noch möglich, parallelize anzugeben. Die Methode SparkConext.parallelize wird nur für verteilte lokale Datenstrukturen verwendet, die sich im Treiberspeicher befinden. Sie sollten nicht daran gewöhnt sein, große Datenmengen zu verteilen, ganz zu schweigen von der Umverteilung von RDDs oder übergeordneten Datenstrukturen (wie in Ihrer vorherigen Frage).

sc.parallelize(trainingData.collect()) 

Wenn Sie zwischen RDD/Dataframe (Dataset) konvertieren möchten, verwenden Sie die dafür vorgesehenen Methoden:

  1. von DataFrame zu RDD:

    import org.Apache.spark.sql.DataFrame
    import org.Apache.spark.sql.Row
    import org.Apache.spark.rdd.RDD
    
    val df: DataFrame  = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
    val rdd: RDD[Row] = df.rdd
    
  2. form RDD to DataFrame:

    val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
    val df1: DataFrame = rdd.toDF
    // or
    val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
    
9
zero323

Sie sollten vielleicht den Unterschied zwischen RDD und DataFrame überprüfen und wie man zwischen den beiden konvertiert: Unterschied zwischen DataFrame und RDD in Spark

So beantworten Sie Ihre Frage direkt: Ein DataFrame ist bereits für die parallele Ausführung optimiert. Sie müssen nichts tun und können es direkt an die fit () -Methode eines beliebigen Zündfunkenschätzers übergeben. Die parallelen Ausführungen werden im Hintergrund ausgeführt.

1
Timomo