Ich muss mein Dataframe in ein Dataset konvertieren und habe folgenden Code verwendet:
val final_df = Dataframe.withColumn(
"features",
toVec4(
// casting into Timestamp to parse the string, and then into Int
$"time_stamp_0".cast(TimestampType).cast(IntegerType),
$"count",
$"sender_ip_1",
$"receiver_ip_2"
)
).withColumn("label", (Dataframe("count"))).select("features", "label")
final_df.show()
val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
val TrainingDF = trainingTest(0)
val TestingDF=trainingTest(1)
TrainingDF.show()
TestingDF.show()
///lets create our liner regression
val lir= new LinearRegression()
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setMaxIter(100)
.setTol(1E-6)
case class df_ds(features:Vector, label:Integer)
org.Apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
val Training_ds = TrainingDF.as[df_ds]
Mein Problem ist das, Ich habe den folgenden Fehler erhalten:
Error:(96, 36) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val Training_ds = TrainingDF.as[df_ds]
Es scheint, dass sich die Anzahl der Werte in Dataframe von der Anzahl der Werte in meiner Klasse unterscheidet. Allerdings verwende ich case class df_ds(features:Vector, label:Integer)
in meinem TrainingDF-Datenrahmen, da er einen Vektor von Features und eine Ganzzahl-Beschriftung hat. Hier ist TrainingDF-Datenrahmen:
+--------------------+-----+
| features|label|
+--------------------+-----+
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,19...| 19|
|[1.497325796E9,10...| 10|
+--------------------+-----+
Auch hier ist mein Original final_df dataframe:
+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.3| 10.0.0.2| 10|
+------------+-----------+-------------+-----+
Ich habe jedoch den erwähnten Fehler bekommen! Kann mir jemand helfen? Vielen Dank im Voraus.
Die Fehlermeldung, die Sie gerade lesen, ist ein ziemlich guter Hinweis.
Wenn Sie eine DataFrame
in eine Dataset
konvertieren, müssen Sie eine richtige Encoder
für das haben, was in den DataFrame
-Zeilen gespeichert ist.
Encoder für primitivartige Typen (Int
s, String
s usw.) und case classes
werden bereitgestellt, indem Sie die Implizite für Ihre SparkSession
wie folgt importieren:
case class MyData(intField: Int, boolField: Boolean) // e.g.
val spark: SparkSession = ???
val df: DataFrame = ???
import spark.implicits._
val ds: Dataset[MyData] = df.as[MyData]
Wenn das nicht funktioniert, liegt es auch nicht daran, dass der Typ, den Sie versuchen, cast der DataFrame
zu verwenden, nicht unterstützt wird. In diesem Fall müssten Sie Ihre eigene Encoder
schreiben: Sie finden weitere Informationen dazu hier und sehen ein Beispiel (die Encoder
für Java.time.LocalDateTime
) hier .
Spark 1.6.0
case class MyCase(id: Int, name: String)
val encoder = org.Apache.spark.sql.catalyst.encoders.ExpressionEncoder[MyCase]
val dataframe = …
val dataset = dataframe.as(encoder)
Spark 2.0 oder höher
case class MyCase(id: Int, name: String)
val encoder = org.Apache.spark.sql.Encoders.product[MyCase]
val dataframe = …
val dataset = dataframe.as(encoder)