wake-up-neo.com

Wie ändere ich Spaltentypen im DataFrame von Spark SQL?

Angenommen, ich mache etwas wie:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment                
1997 Ford  E350  Go get one now th...  

aber ich wollte das year wirklich als Int (und vielleicht einige andere Spalten transformieren).

Das Beste, was ich mir einfallen lassen konnte, ist

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.Apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

das ist ein bisschen gewunden.

Ich komme aus R und bin es gewohnt, schreiben zu können, z.

df2 <- df %>%
   mutate(year = year %>% as.integer, 
          make = make %>% toupper)

Ich vermisse wahrscheinlich etwas, da es einen besseren Weg geben sollte, dies in spark/scala zu tun ...

136
kevinykuo

Edit: Neueste Version

Seit spark 2.x können Sie .withColumn verwenden. Überprüfen Sie die Dokumente hier:

https://spark.Apache.org/docs/latest/api/scala/index.html#[email protected] (colName: String, col: org.Apache.spark. sql.Column): org.Apache.spark.sql.DataFrame

Älteste Antwort

Seit Spark Version 1.4 können Sie die Umwandlungsmethode mit DataType auf die Spalte anwenden:

import org.Apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

Wenn Sie SQL-Ausdrücke verwenden, können Sie auch Folgendes tun:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

Weitere Informationen finden Sie in den Dokumenten: http://spark.Apache.org/docs/1.6.0/api/scala/#org.Apache.spark.sql.DataFrame

130
msemelman

[EDIT: März 2016: Danke für die Stimmen! Obwohl dies wirklich nicht die beste Antwort ist, denke ich, dass die Lösungen, die auf withColumn, withColumnRenamed und cast basieren, die von msemelman, Martin Senne und anderen vorgeschlagen wurden, einfacher und sauberer sind].

Ich denke, Ihr Ansatz ist in Ordnung. Denken Sie daran, dass ein Spark DataFrame ein (unveränderliches) RDD von Zeilen ist. Wir werden also nie wirklich Ersetzen eine Spalte erstellen jedes Mal neu DataFrame mit einem neuen Schema.

Angenommen, Sie haben eine Original-DF mit dem folgenden Schema:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

Einige UDFs sind in einer oder mehreren Spalten definiert:

import org.Apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

Das Ändern von Spaltentypen oder sogar das Erstellen eines neuen DataFrames aus einem anderen kann folgendermaßen geschrieben werden:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

was ergibt:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

Dies kommt Ihrer eigenen Lösung ziemlich nahe. Wenn Sie einfach die Typänderungen und andere Transformationen als separate udf vals beibehalten, wird der Code lesbarer und wiederverwendbarer.

87
Svend

Da die cast -Operation für Spark Column verfügbar ist (und ich persönlich die von @udf vorgeschlagenen Svend nicht bevorzuge), wie wäre es mit:

df.select( df("year").cast(IntegerType).as("year"), ... )

auf den gewünschten Typ umwandeln? Als netter Nebeneffekt werden Werte, die in diesem Sinne nicht umwandelbar sind, zu null.

Falls Sie dies als eine Hilfsmethode benötigen, verwenden Sie:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

welches verwendet wird wie:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
60
Martin Senne

Zuerst, wenn Sie einen Typ darstellen möchten, dann ist dies:

import org.Apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

Bei gleichem Spaltennamen wird die Spalte durch eine neue ersetzt. Sie müssen keine Schritte hinzufügen und löschen.

Zweite, ungefähr Scala vs R.
Dies ist der Code, der R am ähnlichsten ist, den ich finden kann:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

Die Codelänge ist jedoch etwas länger als die von R. Das hat nichts mit der Ausführlichkeit der Sprache zu tun. In R ist mutate eine spezielle Funktion für R-Datenrahmen, während Sie in Scala eine Funktion dank ihrer Ausdruckskraft problemlos ad-hoc verwenden können.
In Word werden spezifische Lösungen vermieden, da die Grundlage ausreicht, um schnell und einfach eigene Funktionen für die Domänensprache zu erstellen.


randnotiz: df.columns ist überraschenderweise ein Array[String] anstelle von Array[Column], vielleicht möchten sie, dass es wie der Datenrahmen von Python Pandas aussieht.

43
WeiChing Lin

Sie können selectExpr verwenden, um es ein wenig sauberer zu machen:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")
16
dnlbrky

Java-Code zum Ändern des Datentyps des DataFrame von String in Integer

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))

Der vorhandene Datentyp (String-Datentyp) wird einfach in Integer umgewandelt.

10
manishbelsare

Um das Jahr von string in int zu konvertieren, können Sie dem csv-Reader die folgende Option hinzufügen: "inferSchema" -> "true", siehe DataBricks-Dokumentation

8
Peter Rose

Das funktioniert also nur dann wirklich, wenn Sie Probleme beim Speichern auf einem JDBC-Treiber wie SQLServer haben, aber es ist wirklich hilfreich für Fehler, die Sie mit Syntax und Typen antreffen.

import org.Apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.Apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", Java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", Java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", Java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", Java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", Java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", Java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", Java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", Java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", Java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", Java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", Java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", Java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", Java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)
6
ben jarman

Generieren Sie einen einfachen Datensatz mit fünf Werten und konvertieren Sie int in string:

val df = spark.range(5).select( col("id").cast("string") )
6
user8106134
df.select($"long_col".cast(IntegerType).as("int_col"))
5
soulmachine

bei den Antworten, die darauf hindeuten, cast zu verwenden, ist die cast-Methode in spark 1.4.1 fehlerhaft.

beispielsweise hat ein Datenrahmen mit einer Zeichenfolgenspalte, die den Wert "8182175552014127960" hat, wenn er in "bigint" umgewandelt wird, den Wert "8182175552014128100".

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

Wir mussten uns mit vielen Problemen auseinandersetzen, bevor wir diesen Fehler fanden, weil wir große Spalten in der Produktion hatten.

5
sauraI3h

Mit Spark Sql 2.4.0 können Sie das tun:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")
4
Eric Bellet

Sie können den folgenden Code verwenden.

df.withColumn("year", df("year").cast(IntegerType))

Womit die Spalte Jahr in die Spalte IntegerType konvertiert wird.

2
adarsh

Diese Methode löscht die alte Spalte und erstellt neue Spalten mit denselben Werten und neuem Datentyp. Meine ursprünglichen Datentypen beim Erstellen des DataFrame waren:

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

Danach habe ich folgenden Code ausgeführt, um den Datentyp zu ändern: -

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

Danach ergab sich folgendes Ergebnis:

root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)
2
PirateJack

Wenn Sie Dutzende von Spalten umbenennen müssen, die durch ihren Namen angegeben sind, wird im folgenden Beispiel der Ansatz von @dnlbrky verwendet und auf mehrere Spalten gleichzeitig angewendet:

df.selectExpr(df.columns.map(cn => {
    if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
    else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
    else cn
}):_*)

Nicht gegossene Spalten bleiben unverändert. Alle Spalten bleiben in ihrer ursprünglichen Reihenfolge.

1
cubic lettuce
Another solution is as follows:
1) Keep "inferSchema" as False
2) While running 'Map' functions on the row, you can read 'asString' (row.getString...)

<Code>
        //Read CSV and create dataset
        Dataset<Row> enginesDataSet = sparkSession
                    .read()
                    .format("com.databricks.spark.csv")
                    .option("header", "true")
                    .option("inferSchema","false")
                    .load(args[0]);

        JavaRDD<Box> vertices = enginesDataSet
                    .select("BOX","BOX_CD")
                    .toJavaRDD()
                    .map(new Function<Row, Box>() {
                        @Override
                        public Box call(Row row) throws Exception {
                            return new Box((String)row.getString(0),(String)row.get(1));
                        }
                    });
</Code>
0
Vibha

Sie können den Datentyp einer Spalte ändern, indem Sie die Umwandlung in spark sql verwenden. tabellenname ist tabelle und hat nur zwei spalten spalte1 und spalte2 und spalte1 datentyp soll geändert werden. ex-spark.sql ("select cast (column1 as Double) column1NewName, column2 from table") An die Stelle von double schreiben Sie Ihren Datentyp.

0
Tejasvi Sharma