Ich habe angefangen, Spark SQL und DataFrames in Spark 1.4.0 zu verwenden. Ich möchte einen benutzerdefinierten Partitionierer für DataFrames in Scala definieren, aber nicht sehen, wie das geht.
Eine der Datentabellen, mit denen ich arbeite, enthält eine Liste von Transaktionen nach Konto, wie im folgenden Beispiel dargestellt.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Zumindest anfänglich werden die meisten Berechnungen zwischen den Transaktionen innerhalb eines Kontos durchgeführt. Ich möchte also, dass die Daten so partitioniert werden, dass alle Transaktionen für ein Konto in derselben Spark Partition sind.
Aber ich sehe keinen Weg, dies zu definieren. Die DataFrame-Klasse verfügt über eine Methode mit dem Namen 'repartition (Int)', mit der Sie die Anzahl der zu erstellenden Partitionen angeben können. Es ist jedoch keine Methode verfügbar, um einen benutzerdefinierten Partitionierer für einen DataFrame zu definieren, wie er für eine RDD angegeben werden kann.
Die Quelldaten werden in Parkett gespeichert. Ich habe festgestellt, dass Sie beim Schreiben eines DataFrame in Parquet eine Spalte angeben können, nach der partitioniert werden soll. Vermutlich kann ich Parquet dann anweisen, die Daten nach der Spalte "Konto" zu partitionieren. Aber es könnte Millionen von Konten geben, und wenn ich Parquet richtig verstehe, würde es für jedes Konto ein eigenes Verzeichnis erstellen, sodass dies nicht nach einer vernünftigen Lösung klang.
Gibt es eine Möglichkeit, Spark um diesen DataFrame so zu partitionieren, dass sich alle Daten für ein Konto in derselben Partition befinden?
SPARK-22614 macht die Bereichspartitionierung verfügbar.
val partitionedByRange = df.repartitionByRange(42, $"k")
partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
// +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]
SPARK-22389 macht die Partitionierung des externen Formats in Data Source API v2 verfügbar.
In Spark> = 1.6 ist es möglich, Spaltenpartitionierung für Abfrage und Zwischenspeicherung zu verwenden. Siehe: SPARK-1141 und SPARK-4849 mit repartition
Methode:
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- Scan PhysicalRDD[_1#5,_2#6]
Im Gegensatz zu RDDs
Spark Dataset
(einschließlich Dataset[Row]
Aka DataFrame
) kann derzeit kein benutzerdefinierter Partitionierer verwendet werden Beheben Sie dies, indem Sie eine künstliche Partitionierungssäule erstellen, die Ihnen jedoch nicht die gleiche Flexibilität bietet.
Sie können Eingabedaten vorab partitionieren, bevor Sie ein DataFrame
erstellen.
import org.Apache.spark.sql.types._
import org.Apache.spark.sql.Row
import org.Apache.spark.HashPartitioner
val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))
val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))
val partitioner = new HashPartitioner(5)
val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values
val df = sqlContext.createDataFrame(partitioned, schema)
Da die Erstellung von DataFrame
aus einem RDD
nur eine einfache Kartenphase erfordert, sollte das vorhandene Partitionslayout beibehalten werden *:
assert(df.rdd.partitions == partitioned.partitions)
Auf die gleiche Weise können Sie vorhandene DataFrame
neu partitionieren:
sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
Es sieht also so aus, als ob es nicht unmöglich ist. Die Frage bleibt, ob es überhaupt Sinn macht. Ich werde argumentieren, dass es meistens nicht so ist:
Repartitionierung ist ein teurer Prozess. In einem typischen Szenario müssen die meisten Daten serialisiert, gemischt und deserialisiert werden. Andererseits ist die Anzahl der Vorgänge, die von vorpartitionierten Daten profitieren können, relativ gering und weiter begrenzt, wenn die interne API nicht darauf ausgelegt ist, diese Eigenschaft zu nutzen.
GROUP BY
- es ist möglich, den speicherbedarf der temporären puffer zu reduzieren **, aber die gesamtkosten sind viel höher. Mehr oder weniger äquivalent zu groupByKey.mapValues(_.reduce)
(aktuelles Verhalten) vs reduceByKey
(Vor-Partitionierung). In der Praxis wahrscheinlich nicht nützlich.SqlContext.cacheTable
. Da anscheinend die Lauflängencodierung verwendet wird, kann durch Anwenden von OrderedRDDFunctions.repartitionAndSortWithinPartitions
Die Komprimierungsrate verbessert werden.Die Leistung hängt stark von der Verteilung der Schlüssel ab. Wenn es schief ist, führt dies zu einer suboptimalen Ressourcennutzung. Im schlimmsten Fall ist es unmöglich, den Job überhaupt zu beenden.
Partitionierung mit JDBC-Quellen :
JDBC-Datenquellen unterstützen predicates
Argument . Es kann wie folgt verwendet werden:
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
Es wird eine einzelne JDBC-Partition pro Prädikat erstellt. Beachten Sie, dass in der resultierenden Tabelle Duplikate angezeigt werden, wenn mit einzelnen Prädikaten erstellte Mengen nicht disjunkt sind.
partitionBy
Methode in DataFrameWriter
:
Spark DataFrameWriter
bietet die Methode partitionBy
, mit der Daten beim Schreiben "partitioniert" werden können. Es trennt die Daten beim Schreiben anhand der bereitgestellten Spalten
val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")
df.write.partitionBy("k").json("/tmp/foo.json")
Dies aktiviert das Prädikat Push-down beim Lesen für Abfragen basierend auf dem Schlüssel:
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")
aber es ist nicht gleichbedeutend mit DataFrame.repartition
. Insbesondere Aggregationen wie:
val cnts = df1.groupBy($"k").sum()
benötigt noch TungstenExchange
:
cnts.explain
// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBy
Methode in DataFrameWriter
(Spark> = 2.0):
bucketBy
hat ähnliche Anwendungen wie partitionBy
, ist jedoch nur für Tabellen verfügbar (saveAsTable
). Bucketing-Informationen können zur Optimierung von Joins verwendet werden:
// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")
// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
// :- *Sort [k#41 ASC NULLS FIRST], false, 0
// : +- *Project [k#41, v#42]
// : +- *Filter isnotnull(k#41)
// : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
// +- *Sort [k#46 ASC NULLS FIRST], false, 0
// +- *Project [k#46, v2#47]
// +- *Filter isnotnull(k#46)
// +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
* Mit Partitionslayout meine ich nur eine Datenverteilung. partitioned
RDD hat keinen Partitionierer mehr. ** Vorausgesetzt keine frühe Projektion. Wenn die Aggregation nur eine kleine Teilmenge der Spalten abdeckt, ist wahrscheinlich überhaupt kein Gewinn zu verzeichnen.
In Spark <1.6 Wenn Sie ein HiveContext
erstellen, nicht das einfache alte SqlContext
, können Sie das HiveQLDISTRIBUTE BY colX...
(stellt sicher, dass jeder von N Reduzierern nicht überlappende Bereiche von x erhält) & CLUSTER BY colX...
(Abkürzung für Verteilen nach und Sortieren nach) zum Beispiel;
df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
Nicht sicher, wie dies mit Spark DF= api. Diese Schlüsselwörter werden im normalen SqlContext nicht unterstützt (beachten Sie, dass Sie keinen Hive-Metaspeicher benötigen) den HiveContext benutzen)
BEARBEITEN: Spark 1.6+ hat dies jetzt in der nativen DataFrame-API
Verwenden Sie den DataFrame, der zurückgegeben wird von:
yourDF.orderBy(account)
Es gibt keine explizite Möglichkeit, partitionBy
für einen DataFrame zu verwenden, nur für ein PairRDD. Wenn Sie jedoch einen DataFrame sortieren, wird dies in seinem LogicalPlan verwendet, und dies hilft, wenn Sie Berechnungen für jedes Konto durchführen müssen.
Ich bin gerade auf dasselbe Problem gestoßen, nämlich einen Datenrahmen, den ich nach Konto partitionieren möchte. Ich gehe davon aus, dass Sie, wenn Sie "möchten, dass die Daten so partitioniert werden, dass sich alle Transaktionen für ein Konto in derselben Partition befinden Spark Partition)", diese für die Skalierung und Leistung, aber für Ihren Code wünschen hängt nicht davon ab (wie die Verwendung von mapPartitions()
etc), oder?
Ich konnte dies mit RDD tun. Aber ich weiß nicht, ob dies für Sie eine akzeptable Lösung ist. Sobald Sie DF als RDD verfügbar haben, können Sie repartitionAndSortWithinPartitions
anwenden, um eine benutzerdefinierte Neupartitionierung von Daten durchzuführen.
Hier ist ein Beispiel, das ich verwendet habe:
class DatePartitioner(partitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
val start_time: Long = key.asInstanceOf[Long]
Objects.hash(Array(start_time)) % partitions
}
override def numPartitions: Int = partitions
}
myRDD
.repartitionAndSortWithinPartitions(new DatePartitioner(24))
.map { v => v._2 }
.toDF()
.write.mode(SaveMode.Overwrite)
Also, um mit einer Antwort zu beginnen:) - Das kannst du nicht
Ich bin kein Experte, aber soweit ich DataFrames verstehe, sind sie nicht gleichbedeutend mit rdd und DataFrame hat keinen Partitionierer.
Im Allgemeinen ist es die Idee von DataFrame, eine andere Abstraktionsebene bereitzustellen, die solche Probleme selbst handhabt. Die Abfragen in DataFrame werden in einen logischen Plan übersetzt, der in Operationen auf RDDs weiter umgesetzt wird. Die von Ihnen vorgeschlagene Partitionierung wird wahrscheinlich automatisch angewendet oder sollte es zumindest sein.
Wenn Sie SparkSQL nicht vertrauen, dass es eine Art optimalen Job liefert, können Sie DataFrame immer in RDD [Row] umwandeln, wie in den Kommentaren vorgeschlagen.