Ich habe eine Spark-Streaming-Anwendung, die Parkettdaten aus Stream schreibt.
sqlContext.sql(
"""
|select
|to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
|hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
|*
|from events
| where at >= 1473667200
""".stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)
dieser Code läuft jede Stunde, aber mit der Zeit hat sich das Schreiben auf Parkett verlangsamt. Als wir anfingen, dauerte es 15 Minuten, um Daten zu schreiben, jetzt dauert es 40 Minuten. Es erfordert Zeit, proportional zu den auf diesem Pfad vorhandenen Daten zu sein. Ich habe versucht, dieselbe Anwendung an einem anderen Ort auszuführen, und das läuft schnell.
Ich habe schemaMerge- und Zusammenfassungsmetadaten deaktiviert:
sparkConf.set("spark.sql.Hive.convertMetastoreParquet.mergeSchema","false")
sparkConf.set("parquet.enable.summary-metadata","false")
mit funken 2.0
batch-Ausführung: leeres Verzeichnis
Verzeichnis mit 350 Ordnern
Ich bin auf dieses Problem gestoßen. Der Anfügemodus ist wahrscheinlich der Täter, da das Finden der Anhängeposition mehr und mehr Zeit in Anspruch nimmt, wenn die Größe Ihrer Parkettdatei wächst.
Eine Problemumgehung, die ich gefunden habe, besteht darin, den Ausgabepfad regelmäßig zu ändern. Das Zusammenführen und Umordnen der Daten aus allen Ausgabedatenrahmen ist dann normalerweise kein Problem.
def appendix: String = ((time.milliseconds - timeOrigin) / (3600 * 1000)).toString
df.write.mode(SaveMode.Append).format("parquet").save(s"${outputPath}-H$appendix")
Versuchen Sie, das Dataframe in EMR HDFS (hdfs: // ...) zu schreiben, und verwenden Sie dann s3-dist-cp, um die Daten von HDFS nach S3 hochzuladen. Arbeitete für mich.
Es könnte sein, dass der Modus angehängt wird. In diesem Modus sollten neue Dateien mit unterschiedlichen Namen aus bereits vorhandenen Dateien erstellt werden. Spark listet also jedes Mal Dateien in S3 (was langsam ist) auf.
Wir haben parquet.enable.summary-metadata auch etwas anders eingestellt:
javaSparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");