Ich möchte eine CSV-Datei in Spark lesen und als DataFrame konvertieren und mit df.registerTempTable("table_name")
in HDFS speichern.
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.Java:418)
at org.Apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.Apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.Java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.Java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.Java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.Java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.Java:107)
Was ist der richtige Befehl, um CSV-Dateien als DataFrame in Apache Spark zu laden?
spark-csv ist Teil der Kernfunktionalität von Spark und erfordert keine separate Bibliothek. Sie könnten es zum Beispiel tun
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
Zuerst initialisieren Sie SparkSession
object standardmäßig ist es in Shells als spark
val spark = org.Apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
Verwenden Sie eine der folgenden Methoden, um CSV als
DataFrame/DataSet
zu laden.
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
Abhängigkeiten:
"org.Apache.spark" % "spark-core_2.11" % 2.0.0,
"org.Apache.spark" % "spark-sql_2.11" % 2.0.0,
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
Abhängigkeiten:
"org.Apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
Es ist für wessen Hadoop 2.6 und Spark 1.6 und ohne "Databricks" -Paket.
import org.Apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.Apache.spark.sql.Row;
val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))
val df = sqlContext.createDataFrame(rdd, schema)
Mit Spark 2.0 können Sie CSV wie folgt lesen
val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)
In Java 1.8 Dieses Code-Snippet eignet sich hervorragend zum Lesen von CSV-Dateien
POM.xml
<dependency>
<groupId>org.Apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.Apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.Apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
Java
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
Pennys Spark 2 ist der Weg, um es in spark2 zu tun. Es gibt noch einen weiteren Trick: Lassen Sie diesen Header durch einen ersten Scan der Daten generieren, indem Sie die Option inferSchema
auf true
setzen.
Unter der Annahme, dass spark
eine von Ihnen eingerichtete Funksitzung ist, ist dies der Vorgang zum Laden aller Landsat-Images, die Amazon Host auf S3 hostet, in die CSV-Indexdatei.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.Apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
Die schlechte Nachricht ist: Dies löst einen Scan durch die Datei aus; Für etwas Großes wie diese komprimierte CSV-Datei mit 20 MB, die 30 Sekunden über eine Langstreckenverbindung dauern kann. Bedenken Sie Folgendes: Sie sollten das Schema manuell programmieren, sobald Sie es erhalten haben.
(Code-Snippet Apache Software License 2.0, lizenziert, um jegliche Mehrdeutigkeit zu vermeiden; etwas, das ich als Demo/Integrationstest der S3-Integration durchgeführt habe)
Lädt eine CSV-Datei und gibt das Ergebnis als DataFrame zurück.
df=sparksession.read.option("header", true).csv("file_name.csv")
Dataframe hat eine Datei als CSV-Format behandelt.
Falls Sie ein Glas mit Scala 2.11 und Apache 2.0 oder höher bauen.
Es ist nicht erforderlich, ein sqlContext
- oder sparkContext
-Objekt zu erstellen. Ein SparkSession
-Objekt genügt für alle Anforderungen.
Folgendes ist mein Code, der gut funktioniert:
import org.Apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.Apache.log4j.{Level, LogManager, Logger}
object driver {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
log.info("**********JAR EXECUTION STARTED**********")
val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter","|")
.option("inferSchema","true")
.load("d:/small_projects/spark/test.pos")
df.show()
}
}
Falls Sie in einem Cluster arbeiten, ändern Sie einfach .master("local")
in .master("yarn")
, während Sie das sparkBuilder
-Objekt definieren
Der Spark-Doc behandelt das: https://spark.Apache.org/docs/2.2.0/sql-programming-guide.html
Versuchen Sie dies, wenn Sie spark 2.0+ verwenden
For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")
For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")
For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
Hinweis: - Dies funktioniert für jede begrenzte Datei. Verwenden Sie einfach die Option ("Trennzeichen"), um den Wert zu ändern.
Hoffe das ist hilfreich.
Das Analysieren einer CSV-Datei birgt viele Herausforderungen. Sie wird immer größer, wenn die Dateigröße größer ist. Wenn nicht-englische/escape/separator/andere Zeichen in den Spaltenwerten vorhanden sind, kann dies zu Fehlern beim Analysieren führen.
Die Magie liegt dann in den verwendeten Optionen. Diejenigen, die für mich und die Hoffnung funktionierten, sollten die meisten Edge-Fälle abdecken.
### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
### Note the options that are used. You may have to Tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep=',',
quote='"',
escape='"',
maxColumns=2,
inferSchema=True)
Hoffentlich hilft das. Weitere Informationen finden Sie hier: Verwenden von PySpark 2 zum Lesen von CSV mit HTML-Quellcode
Hinweis: Der obige Code stammt von Spark 2 API, wo die CSV-API zum Lesen von Dateien mit integrierten Paketen von Spark installiert wird.
Hinweis: PySpark ist ein Python-Wrapper für Spark und verwendet dieselbe API wie Scala/Java.
Das Standarddateiformat ist Parquet mit spark.read .. und das Lesen der Datei in csv, weshalb Sie die Ausnahme erhalten. Geben Sie das CSV-Format mit der API an, die Sie verwenden möchten