Ich habe eine Textdatei in HDFS und möchte sie in Spark in einen Datenrahmen konvertieren.
Ich verwende den Spark-Kontext, um die Datei zu laden, und versuche dann, einzelne Spalten aus dieser Datei zu generieren.
val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))
Danach versuche ich die folgende Operation.
myFile1.toDF()
Ich erhalte Probleme, da die Elemente in myFile1 RDD jetzt vom Array-Typ sind.
Wie kann ich dieses Problem lösen?
Update - ab Spark 1.6 können Sie einfach die integrierte CSV-Datenquelle verwenden:
spark: SparkSession = // create the Spark Session
val df = spark.read.csv("file.txt")
Sie können auch verschiedene Optionen zur Steuerung der CSV-Analyse verwenden, z.
val df = spark.read.option("header", "false").csv("file.txt")
Für Spark-Version <1.6: Am einfachsten verwenden Sie spark-csv - fügen Sie es in Ihre Abhängigkeiten ein und folgen Sie der README-Datei, um ein benutzerdefiniertes Trennzeichen (;
) festzulegen. , kann CSV-Header lesen (falls vorhanden) und das Schema types ableiten (mit den Kosten eines zusätzlichen Scans der Daten).
Wenn Sie das Schema kennen, können Sie alternativ eine Fallklasse erstellen, die es darstellt, und Ihre RDD-Elemente in Instanzen dieser Klasse abbilden, bevor sie in einen DataFrame umgewandelt werden, z.
case class Record(id: Int, name: String)
val myFile1 = myFile.map(x=>x.split(";")).map {
case Array(id, name) => Record(id.toInt, name)
}
myFile1.toDF() // DataFrame will have columns "id" and "name"
Ich habe verschiedene Möglichkeiten zum Erstellen von DataFrame aus einer Textdatei angegeben
val conf = new SparkConf().setAppName(appName).setMaster("local")
val sc = SparkContext(conf)
val file = sc.textFile("C:\\vikas\\spark\\Interview\\text.txt")
val fileToDf = file.map(_.split(",")).map{case Array(a,b,c) =>
(a,b.toInt,c)}.toDF("name","age","city")
fileToDf.foreach(println(_))
import org.Apache.spark.sql.SparkSession
val sparkSess =
SparkSession.builder().appName("SparkSessionZipsExample")
.config(conf).getOrCreate()
val df = sparkSess.read.option("header",
"false").csv("C:\\vikas\\spark\\Interview\\text.txt")
df.show()
import org.Apache.spark.sql.types._
val schemaString = "name age city"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, nullable=true))
val schema = StructType(fields)
val dfWithSchema = sparkSess.read.option("header",
"false").schema(schema).csv("C:\\vikas\\spark\\Interview\\text.txt")
dfWithSchema.show()
import org.Apache.spark.sql.SQLContext
val fileRdd =
sc.textFile("C:\\vikas\\spark\\Interview\\text.txt").map(_.split(",")).map{x
=> org.Apache.spark.sql.Row(x:_*)}
val sqlDf = sqlCtx.createDataFrame(fileRdd,schema)
sqlDf.show()
Wenn Sie die toDF
-Methode verwenden möchten, müssen Sie Ihre RDD
von Array[String]
in eine RDD
einer Fallklasse konvertieren. Zum Beispiel müssen Sie Folgendes tun:
case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
val df = spark.read.textFile("abc.txt")
case class Abc (amount:Int, types: String, id:Int) //columns and data types
val df2 = df.map(rec=>Amount(rec(0).toInt, rec(1), rec(2).toInt))
rdd2.printSchema
root
|-- amount: integer (nullable = true)
|-- types: string (nullable = true)
|-- id: integer (nullable = true)
Sie können es nicht in einen Datenrahmen konvertieren, bevor Sie die implizite Konvertierung verwenden.
val sqlContext = new SqlContext(new SparkContext())
import sqlContext.implicits._
Danach können Sie dies nur noch in einen Datenrahmen konvertieren
case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
Ich weiß, dass ich ziemlich spät komme, um dies zu beantworten, aber ich habe mir eine andere Antwort ausgedacht:
val rdd = sc.textFile("/home/training/mydata/file.txt")
val text = rdd.map(lines=lines.split(",")).map(arrays=>(ararys(0),arrays(1))).toDF("id","name").show
Sie können eine Datei mit einer RDD lesen und ihr dann ein Schema zuweisen. Zwei gängige Methoden zum Erstellen eines Schemas sind entweder eine Fallklasse oder ein Schema-Objekt [mein bevorzugtes Objekt]. Folgt den kurzen Codeausschnitten, die Sie verwenden können.
Case Class Ansatz
case class Test(id:String,name:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
Schemaansatz
import org.Apache.spark.sql.types._
val schemaString = "id name"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable=true))
val schema = StructType(fields)
val dfWithSchema = sparkSess.read.option("header","false").schema(schema).csv("file.txt")
dfWithSchema.show()
Der zweite ist mein bevorzugter Ansatz, da in der Fallklasse maximal 22 Felder festgelegt sind. Dies ist ein Problem, wenn Ihre Datei mehr als 22 Felder enthält!