Angenommen, ich mache etwas wie:
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()
root
|-- year: string (nullable = true)
|-- make: string (nullable = true)
|-- model: string (nullable = true)
|-- comment: string (nullable = true)
|-- blank: string (nullable = true)
df.show()
year make model comment blank
2012 Tesla S No comment
1997 Ford E350 Go get one now th...
aber ich wollte das year
wirklich als Int
(und vielleicht einige andere Spalten transformieren).
Das Beste, was ich mir einfallen lassen konnte, ist
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.Apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]
das ist ein bisschen gewunden.
Ich komme aus R und bin es gewohnt, schreiben zu können, z.
df2 <- df %>%
mutate(year = year %>% as.integer,
make = make %>% toupper)
Ich vermisse wahrscheinlich etwas, da es einen besseren Weg geben sollte, dies in spark/scala zu tun ...
Seit spark 2.x können Sie .withColumn
verwenden. Überprüfen Sie die Dokumente hier:
Seit Spark Version 1.4 können Sie die Umwandlungsmethode mit DataType auf die Spalte anwenden:
import org.Apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
.drop("year")
.withColumnRenamed("yearTmp", "year")
Wenn Sie SQL-Ausdrücke verwenden, können Sie auch Folgendes tun:
val df2 = df.selectExpr("cast(year as int) year",
"make",
"model",
"comment",
"blank")
Weitere Informationen finden Sie in den Dokumenten: http://spark.Apache.org/docs/1.6.0/api/scala/#org.Apache.spark.sql.DataFrame
[EDIT: März 2016: Danke für die Stimmen! Obwohl dies wirklich nicht die beste Antwort ist, denke ich, dass die Lösungen, die auf withColumn
, withColumnRenamed
und cast
basieren, die von msemelman, Martin Senne und anderen vorgeschlagen wurden, einfacher und sauberer sind].
Ich denke, Ihr Ansatz ist in Ordnung. Denken Sie daran, dass ein Spark DataFrame
ein (unveränderliches) RDD von Zeilen ist. Wir werden also nie wirklich Ersetzen eine Spalte erstellen jedes Mal neu DataFrame
mit einem neuen Schema.
Angenommen, Sie haben eine Original-DF mit dem folgenden Schema:
scala> df.printSchema
root
|-- Year: string (nullable = true)
|-- Month: string (nullable = true)
|-- DayofMonth: string (nullable = true)
|-- DayOfWeek: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Distance: string (nullable = true)
|-- CRSDepTime: string (nullable = true)
Einige UDFs sind in einer oder mehreren Spalten definiert:
import org.Apache.spark.sql.functions._
val toInt = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt )
val days_since_nearest_holidays = udf(
(year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
)
Das Ändern von Spaltentypen oder sogar das Erstellen eines neuen DataFrames aus einem anderen kann folgendermaßen geschrieben werden:
val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour", toHour(df("CRSDepTime")))
.withColumn("dayOfWeek", toInt(df("DayOfWeek")))
.withColumn("dayOfMonth", toInt(df("DayofMonth")))
.withColumn("month", toInt(df("Month")))
.withColumn("distance", toDouble(df("Distance")))
.withColumn("nearestHoliday", days_since_nearest_holidays(
df("Year"), df("Month"), df("DayofMonth"))
)
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth",
"month", "distance", "nearestHoliday")
was ergibt:
scala> df.printSchema
root
|-- departureDelay: double (nullable = true)
|-- departureHour: integer (nullable = true)
|-- dayOfWeek: integer (nullable = true)
|-- dayOfMonth: integer (nullable = true)
|-- month: integer (nullable = true)
|-- distance: double (nullable = true)
|-- nearestHoliday: integer (nullable = true)
Dies kommt Ihrer eigenen Lösung ziemlich nahe. Wenn Sie einfach die Typänderungen und andere Transformationen als separate udf val
s beibehalten, wird der Code lesbarer und wiederverwendbarer.
Da die cast
-Operation für Spark Column
verfügbar ist (und ich persönlich die von @udf
vorgeschlagenen Svend
nicht bevorzuge), wie wäre es mit:
df.select( df("year").cast(IntegerType).as("year"), ... )
auf den gewünschten Typ umwandeln? Als netter Nebeneffekt werden Werte, die in diesem Sinne nicht umwandelbar sind, zu null
.
Falls Sie dies als eine Hilfsmethode benötigen, verwenden Sie:
object DFHelper{
def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
df.withColumn( cn, df(cn).cast(tpe) )
}
}
welches verwendet wird wie:
import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
Zuerst, wenn Sie einen Typ darstellen möchten, dann ist dies:
import org.Apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))
Bei gleichem Spaltennamen wird die Spalte durch eine neue ersetzt. Sie müssen keine Schritte hinzufügen und löschen.
Zweite, ungefähr Scala vs R.
Dies ist der Code, der R am ähnlichsten ist, den ich finden kann:
val df2 = df.select(
df.columns.map {
case year @ "year" => df(year).cast(IntegerType).as(year)
case make @ "make" => functions.upper(df(make)).as(make)
case other => df(other)
}: _*
)
Die Codelänge ist jedoch etwas länger als die von R. Das hat nichts mit der Ausführlichkeit der Sprache zu tun. In R ist mutate
eine spezielle Funktion für R-Datenrahmen, während Sie in Scala eine Funktion dank ihrer Ausdruckskraft problemlos ad-hoc verwenden können.
In Word werden spezifische Lösungen vermieden, da die Grundlage ausreicht, um schnell und einfach eigene Funktionen für die Domänensprache zu erstellen.
randnotiz: df.columns
ist überraschenderweise ein Array[String]
anstelle von Array[Column]
, vielleicht möchten sie, dass es wie der Datenrahmen von Python Pandas aussieht.
Sie können selectExpr
verwenden, um es ein wenig sauberer zu machen:
df.selectExpr("cast(year as int) as year", "upper(make) as make",
"model", "comment", "blank")
Java-Code zum Ändern des Datentyps des DataFrame von String in Integer
df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
Der vorhandene Datentyp (String-Datentyp) wird einfach in Integer umgewandelt.
Um das Jahr von string in int zu konvertieren, können Sie dem csv-Reader die folgende Option hinzufügen: "inferSchema" -> "true", siehe DataBricks-Dokumentation
Das funktioniert also nur dann wirklich, wenn Sie Probleme beim Speichern auf einem JDBC-Treiber wie SQLServer haben, aber es ist wirklich hilfreich für Fehler, die Sie mit Syntax und Typen antreffen.
import org.Apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.Apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(5000)", Java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("BIT(1)", Java.sql.Types.BIT))
case IntegerType => Some(JdbcType("INTEGER", Java.sql.Types.INTEGER))
case LongType => Some(JdbcType("BIGINT", Java.sql.Types.BIGINT))
case DoubleType => Some(JdbcType("DOUBLE PRECISION", Java.sql.Types.DOUBLE))
case FloatType => Some(JdbcType("REAL", Java.sql.Types.REAL))
case ShortType => Some(JdbcType("INTEGER", Java.sql.Types.INTEGER))
case ByteType => Some(JdbcType("INTEGER", Java.sql.Types.INTEGER))
case BinaryType => Some(JdbcType("BINARY", Java.sql.Types.BINARY))
case TimestampType => Some(JdbcType("DATE", Java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", Java.sql.Types.DATE))
// case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", Java.sql.Types.NUMERIC))
case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", Java.sql.Types.DECIMAL))
case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
}
}
JdbcDialects.registerDialect(SQLServerDialect)
Generieren Sie einen einfachen Datensatz mit fünf Werten und konvertieren Sie int
in string
:
val df = spark.range(5).select( col("id").cast("string") )
df.select($"long_col".cast(IntegerType).as("int_col"))
bei den Antworten, die darauf hindeuten, cast zu verwenden, ist die cast-Methode in spark 1.4.1 fehlerhaft.
beispielsweise hat ein Datenrahmen mit einer Zeichenfolgenspalte, die den Wert "8182175552014127960" hat, wenn er in "bigint" umgewandelt wird, den Wert "8182175552014128100".
df.show
+-------------------+
| a|
+-------------------+
|8182175552014127960|
+-------------------+
df.selectExpr("cast(a as bigint) a").show
+-------------------+
| a|
+-------------------+
|8182175552014128100|
+-------------------+
Wir mussten uns mit vielen Problemen auseinandersetzen, bevor wir diesen Fehler fanden, weil wir große Spalten in der Produktion hatten.
Mit Spark Sql 2.4.0 können Sie das tun:
spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")
Sie können den folgenden Code verwenden.
df.withColumn("year", df("year").cast(IntegerType))
Womit die Spalte Jahr in die Spalte IntegerType
konvertiert wird.
Diese Methode löscht die alte Spalte und erstellt neue Spalten mit denselben Werten und neuem Datentyp. Meine ursprünglichen Datentypen beim Erstellen des DataFrame waren:
root
|-- id: integer (nullable = true)
|-- flag1: string (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag3: string (nullable = true)
Danach habe ich folgenden Code ausgeführt, um den Datentyp zu ändern: -
df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)
Danach ergab sich folgendes Ergebnis:
root
|-- id: integer (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag1: boolean (nullable = true)
|-- flag3: boolean (nullable = true)
Wenn Sie Dutzende von Spalten umbenennen müssen, die durch ihren Namen angegeben sind, wird im folgenden Beispiel der Ansatz von @dnlbrky verwendet und auf mehrere Spalten gleichzeitig angewendet:
df.selectExpr(df.columns.map(cn => {
if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
else cn
}):_*)
Nicht gegossene Spalten bleiben unverändert. Alle Spalten bleiben in ihrer ursprünglichen Reihenfolge.
Another solution is as follows:
1) Keep "inferSchema" as False
2) While running 'Map' functions on the row, you can read 'asString' (row.getString...)
<Code>
//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema","false")
.load(args[0]);
JavaRDD<Box> vertices = enginesDataSet
.select("BOX","BOX_CD")
.toJavaRDD()
.map(new Function<Row, Box>() {
@Override
public Box call(Row row) throws Exception {
return new Box((String)row.getString(0),(String)row.get(1));
}
});
</Code>
Sie können den Datentyp einer Spalte ändern, indem Sie die Umwandlung in spark sql verwenden. tabellenname ist tabelle und hat nur zwei spalten spalte1 und spalte2 und spalte1 datentyp soll geändert werden. ex-spark.sql ("select cast (column1 as Double) column1NewName, column2 from table") An die Stelle von double schreiben Sie Ihren Datentyp.