wake-up-neo.com

Wie konvertiert man einen Datenrahmen in einen Datensatz in Apache Spark in Scala?

Ich muss mein Dataframe in ein Dataset konvertieren und habe folgenden Code verwendet:

    val final_df = Dataframe.withColumn(
      "features",
      toVec4(
        // casting into Timestamp to parse the string, and then into Int
        $"time_stamp_0".cast(TimestampType).cast(IntegerType),
        $"count",
        $"sender_ip_1",
        $"receiver_ip_2"
      )
    ).withColumn("label", (Dataframe("count"))).select("features", "label")

    final_df.show()

    val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
    val TrainingDF = trainingTest(0)
    val TestingDF=trainingTest(1)
    TrainingDF.show()
    TestingDF.show()

    ///lets create our liner regression
    val lir= new LinearRegression()
    .setRegParam(0.3)
    .setElasticNetParam(0.8)
    .setMaxIter(100)
    .setTol(1E-6)

    case class df_ds(features:Vector, label:Integer)
    org.Apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

    val Training_ds = TrainingDF.as[df_ds]

Mein Problem ist das, Ich habe den folgenden Fehler erhalten:

Error:(96, 36) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    val Training_ds = TrainingDF.as[df_ds]

Es scheint, dass sich die Anzahl der Werte in Dataframe von der Anzahl der Werte in meiner Klasse unterscheidet. Allerdings verwende ich case class df_ds(features:Vector, label:Integer) in meinem TrainingDF-Datenrahmen, da er einen Vektor von Features und eine Ganzzahl-Beschriftung hat. Hier ist TrainingDF-Datenrahmen:

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,10...|   10|
+--------------------+-----+

Auch hier ist mein Original final_df dataframe:

+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.3|     10.0.0.2|   10|
+------------+-----------+-------------+-----+

Ich habe jedoch den erwähnten Fehler bekommen! Kann mir jemand helfen? Vielen Dank im Voraus. 

9
user8131063

Die Fehlermeldung, die Sie gerade lesen, ist ein ziemlich guter Hinweis.

Wenn Sie eine DataFrame in eine Dataset konvertieren, müssen Sie eine richtige Encoder für das haben, was in den DataFrame-Zeilen gespeichert ist.

Encoder für primitivartige Typen (Ints, Strings usw.) und case classes werden bereitgestellt, indem Sie die Implizite für Ihre SparkSession wie folgt importieren:

case class MyData(intField: Int, boolField: Boolean) // e.g.

val spark: SparkSession = ???
val df: DataFrame = ???

import spark.implicits._

val ds: Dataset[MyData] = df.as[MyData]

Wenn das nicht funktioniert, liegt es auch nicht daran, dass der Typ, den Sie versuchen, cast der DataFrame zu verwenden, nicht unterstützt wird. In diesem Fall müssten Sie Ihre eigene Encoder schreiben: Sie finden weitere Informationen dazu hier und sehen ein Beispiel (die Encoder für Java.time.LocalDateTime) hier .

16
stefanobaghino

Spark 1.6.0

case class MyCase(id: Int, name: String)

val encoder = org.Apache.spark.sql.catalyst.encoders.ExpressionEncoder[MyCase]

val dataframe = …

val dataset = dataframe.as(encoder)

Spark 2.0 oder höher

case class MyCase(id: Int, name: String)

val encoder = org.Apache.spark.sql.Encoders.product[MyCase]

val dataframe = …

val dataset = dataframe.as(encoder)
2
Shang Gao