wake-up-neo.com

Spark Train Test Split

Ich bin gespannt, ob es etwas Ähnliches wie sklearns http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.StratifiedShuffleSplit.html für Apache-spark in der neuesten Version 2.0.1 gibt.

Bisher konnte ich nur https://spark.Apache.org/docs/latest/mllib-statistics.html#stratified-sampling finden, was nicht gut für die Aufteilung eines stark unausgeglichenen Datensatzes in einen Zug geeignet zu sein scheint/Testproben.

8
Georg Heiler

Spark unterstützt geschichtete Beispiele, wie in https://s3.amazonaws.com/sparksummit-share/ml-ams-1.0.1/6-sampling/scala/6-sampling_student.html beschrieben.

df.stat.sampleBy("label", Map(0 -> .10, 1 -> .20, 2 -> .3), 0)
2
Georg Heiler

Obwohl diese Antwort nicht Spark-spezifisch ist, tue ich dies in Apache Beam, um den Zug auf 66% zu teilen und 33% zu testen (nur ein anschauliches Beispiel) Buckets oder Bias-Auswahl in Richtung etwas oder sicherstellen, dass die Randomisierung über Dimensionen hinweg fair ist, usw.):

raw_data = p | 'Read Data' >> Read(...)

clean_data = (raw_data
              | "Clean Data" >> beam.ParDo(CleanFieldsFn())


def partition_fn(element):
    return random.randint(0, 2)

random_buckets = (clean_data | beam.Partition(partition_fn, 3))

clean_train_data = ((random_buckets[0], random_buckets[1])
                    | beam.Flatten())

clean_eval_data = random_buckets[2]
3
Reinaldo Aguiar

Nehmen wir an, wir haben einen Datensatz wie diesen:

+---+-----+
| id|label|
+---+-----+
|  0|  0.0|
|  1|  1.0|
|  2|  0.0|
|  3|  1.0|
|  4|  0.0|
|  5|  1.0|
|  6|  0.0|
|  7|  1.0|
|  8|  0.0|
|  9|  1.0|
+---+-----+

Dieser Datensatz ist perfekt ausbalanciert, aber dieser Ansatz funktioniert auch bei nicht ausbalancierten Daten.

Ergänzen wir nun diesen DataFrame mit zusätzlichen Informationen, die hilfreich bei der Entscheidung sind, welche Zeilen für die Zuggruppe verwendet werden sollen. Die Schritte sind wie folgt:

  • Bestimmen Sie, wie viele Beispiele für jedes Etikett Teil eines Zugsets sein sollen, wenn Sie ratio angeben.
  • Mischen Sie die Zeilen des DataFrame.
  • Verwenden Sie die Fensterfunktion, um den DataFrame nach label zu partitionieren und zu ordnen, und ordnen Sie dann die Beobachtungen der einzelnen Labels mit row_number() an.

Wir erhalten den folgenden Datenrahmen:

+---+-----+----------+
| id|label|row_number|
+---+-----+----------+
|  6|  0.0|         1|
|  2|  0.0|         2|
|  0|  0.0|         3|
|  4|  0.0|         4|
|  8|  0.0|         5|
|  9|  1.0|         1|
|  5|  1.0|         2|
|  3|  1.0|         3|
|  1|  1.0|         4|
|  7|  1.0|         5|
+---+-----+----------+

Hinweis: Die Zeilen werden gemischt (siehe: zufällige Reihenfolge in der Spalte id), nach Beschriftung partitioniert (siehe: Spalte label) und nach Rang sortiert.

Nehmen wir an, wir möchten eine Aufteilung von 80% vornehmen. In diesem Fall möchten wir, dass vier 1.0-Labels und vier 0.0-Labels zum Trainingsdatensatz und ein 1.0-Label und ein 0.0-Label zum Testdatensatz gehören. Wir haben diese Informationen in der Spalte row_number, so dass wir sie jetzt einfach in einer benutzerdefinierten Funktion verwenden können (wenn row_number kleiner oder gleich vier ist, geht das Beispiel zum Zugset).

Nach dem Anwenden der UDF sieht der resultierende Datenrahmen wie folgt aus:

+---+-----+----------+----------+
| id|label|row_number|isTrainSet|
+---+-----+----------+----------+
|  6|  0.0|         1|      true|
|  2|  0.0|         2|      true|
|  0|  0.0|         3|      true|
|  4|  0.0|         4|      true|
|  8|  0.0|         5|     false|
|  9|  1.0|         1|      true|
|  5|  1.0|         2|      true|
|  3|  1.0|         3|      true|
|  1|  1.0|         4|      true|
|  7|  1.0|         5|     false|
+---+-----+----------+----------+

Um die Zug-/Testdaten zu erhalten, muss man nun Folgendes tun:

val train = df.where(col("isTrainSet") === true)
val test = df.where(col("isTrainSet") === false)

Diese Sortier- und Partitionierungsschritte können für einige wirklich große Datasets unzulässig sein. Ich empfehle daher, zunächst das Dataset so weit wie möglich zu filtern. Der physische Plan sieht wie folgt aus:

== Physical Plan ==
*(3) Project [id#4, label#5, row_number#11, if (isnull(row_number#11)) null else UDF(label#5, row_number#11) AS isTrainSet#48]
+- Window [row_number() windowspecdefinition(label#5, label#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#11], [label#5], [label#5 ASC NULLS FIRST]
   +- *(2) Sort [label#5 ASC NULLS FIRST, label#5 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(label#5, 200)
         +- *(1) Project [id#4, label#5]
            +- *(1) Sort [_nondeterministic#9 ASC NULLS FIRST], true, 0
               +- Exchange rangepartitioning(_nondeterministic#9 ASC NULLS FIRST, 200)
                  +- LocalTableScan [id#4, label#5, _nondeterministic#9

Hier ist ein voll funktionsfähiges Beispiel (getestet mit Spark 2.3.0 und Scala 2.11.12):

import org.Apache.spark.SparkConf
import org.Apache.spark.sql.expressions.Window
import org.Apache.spark.sql.{DataFrame, Row, SparkSession}
import org.Apache.spark.sql.functions.{col, row_number, udf, Rand}

class StratifiedTrainTestSplitter {

  def getNumExamplesPerClass(ss: SparkSession, label: String, trainRatio: Double)(df: DataFrame): Map[Double, Long] = {
    df.groupBy(label).count().createOrReplaceTempView("labelCounts")
    val query = f"SELECT $label AS ratioLabel, count, cast(count * $trainRatio as long) AS trainExamples FROM labelCounts"
    import ss.implicits._
    ss.sql(query)
      .select("ratioLabel", "trainExamples")
      .map((r: Row) => r.getDouble(0) -> r.getLong(1))
      .collect()
      .toMap
  }

  def split(df: DataFrame, label: String, trainRatio: Double): DataFrame = {
    val w = Window.partitionBy(col(label)).orderBy(col(label))

    val rowNumPartitioner = row_number().over(w)

    val dfRowNum = df.sort(Rand).select(col("*"), rowNumPartitioner as "row_number")

    dfRowNum.show()

    val observationsPerLabel: Map[Double, Long] = getNumExamplesPerClass(df.sparkSession, label, trainRatio)(df)

    val addIsTrainColumn = udf((label: Double, rowNumber: Int) => rowNumber <= observationsPerLabel(label))

    dfRowNum.withColumn("isTrainSet", addIsTrainColumn(col(label), col("row_number")))
  }


}

object StratifiedTrainTestSplitter {

  def getDf(ss: SparkSession): DataFrame = {
    val data = Seq(
      (0, 0.0), (1, 1.0), (2, 0.0), (3, 1.0), (4, 0.0), (5, 1.0), (6, 0.0), (7, 1.0), (8, 0.0), (9, 1.0)
    )
    ss.createDataFrame(data).toDF("id", "label")
  }

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .config(new SparkConf().setMaster("local[1]"))
      .getOrCreate()

    val df = new StratifiedTrainTestSplitter().split(getDf(spark), "label", 0.8)

    df.cache()

    df.where(col("isTrainSet") === true).show()
    df.where(col("isTrainSet") === false).show()
  }
}

Hinweis: In diesem Fall sind die Bezeichnungen Doubles. Wenn Ihre Bezeichnungen Strings sind, müssen Sie hier und da die Typen wechseln.

2