wake-up-neo.com

Wie definiere ich die Partitionierung von DataFrame?

Ich habe angefangen, Spark SQL und DataFrames in Spark 1.4.0 zu verwenden. Ich möchte einen benutzerdefinierten Partitionierer für DataFrames in Scala definieren, aber nicht sehen, wie das geht.

Eine der Datentabellen, mit denen ich arbeite, enthält eine Liste von Transaktionen nach Konto, wie im folgenden Beispiel dargestellt.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

Zumindest anfänglich werden die meisten Berechnungen zwischen den Transaktionen innerhalb eines Kontos durchgeführt. Ich möchte also, dass die Daten so partitioniert werden, dass alle Transaktionen für ein Konto in derselben Spark Partition sind.

Aber ich sehe keinen Weg, dies zu definieren. Die DataFrame-Klasse verfügt über eine Methode mit dem Namen 'repartition (Int)', mit der Sie die Anzahl der zu erstellenden Partitionen angeben können. Es ist jedoch keine Methode verfügbar, um einen benutzerdefinierten Partitionierer für einen DataFrame zu definieren, wie er für eine RDD angegeben werden kann.

Die Quelldaten werden in Parkett gespeichert. Ich habe festgestellt, dass Sie beim Schreiben eines DataFrame in Parquet eine Spalte angeben können, nach der partitioniert werden soll. Vermutlich kann ich Parquet dann anweisen, die Daten nach der Spalte "Konto" zu partitionieren. Aber es könnte Millionen von Konten geben, und wenn ich Parquet richtig verstehe, würde es für jedes Konto ein eigenes Verzeichnis erstellen, sodass dies nicht nach einer vernünftigen Lösung klang.

Gibt es eine Möglichkeit, Spark um diesen DataFrame so zu partitionieren, dass sich alle Daten für ein Konto in derselben Partition befinden?

115
rake

Funke> = 2.3.0

SPARK-22614 macht die Bereichspartitionierung verfügbar.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 macht die Partitionierung des externen Formats in Data Source API v2 verfügbar.

Funken> = 1.6.0

In Spark> = 1.6 ist es möglich, Spaltenpartitionierung für Abfrage und Zwischenspeicherung zu verwenden. Siehe: SPARK-1141 und SPARK-4849 mit repartition Methode:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

Im Gegensatz zu RDDs Spark Dataset (einschließlich Dataset[Row] Aka DataFrame) kann derzeit kein benutzerdefinierter Partitionierer verwendet werden Beheben Sie dies, indem Sie eine künstliche Partitionierungssäule erstellen, die Ihnen jedoch nicht die gleiche Flexibilität bietet.

Funken <1.6.0:

Sie können Eingabedaten vorab partitionieren, bevor Sie ein DataFrame erstellen.

import org.Apache.spark.sql.types._
import org.Apache.spark.sql.Row
import org.Apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Da die Erstellung von DataFrame aus einem RDD nur eine einfache Kartenphase erfordert, sollte das vorhandene Partitionslayout beibehalten werden *:

assert(df.rdd.partitions == partitioned.partitions)

Auf die gleiche Weise können Sie vorhandene DataFrame neu partitionieren:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Es sieht also so aus, als ob es nicht unmöglich ist. Die Frage bleibt, ob es überhaupt Sinn macht. Ich werde argumentieren, dass es meistens nicht so ist:

  1. Repartitionierung ist ein teurer Prozess. In einem typischen Szenario müssen die meisten Daten serialisiert, gemischt und deserialisiert werden. Andererseits ist die Anzahl der Vorgänge, die von vorpartitionierten Daten profitieren können, relativ gering und weiter begrenzt, wenn die interne API nicht darauf ausgelegt ist, diese Eigenschaft zu nutzen.

    • beitritt in einigen Szenarien, aber es würde eine interne Unterstützung erfordern,
    • fensterfunktionen ruft mit passendem Partitionierer auf. Wie oben, beschränkt auf eine einzelne Fensterdefinition. Es ist jedoch bereits intern partitioniert, sodass die Vor-Partitionierung redundant sein kann.
    • einfache aggregationen mit GROUP BY - es ist möglich, den speicherbedarf der temporären puffer zu reduzieren **, aber die gesamtkosten sind viel höher. Mehr oder weniger äquivalent zu groupByKey.mapValues(_.reduce) (aktuelles Verhalten) vs reduceByKey (Vor-Partitionierung). In der Praxis wahrscheinlich nicht nützlich.
    • datenkomprimierung mit SqlContext.cacheTable. Da anscheinend die Lauflängencodierung verwendet wird, kann durch Anwenden von OrderedRDDFunctions.repartitionAndSortWithinPartitions Die Komprimierungsrate verbessert werden.
  2. Die Leistung hängt stark von der Verteilung der Schlüssel ab. Wenn es schief ist, führt dies zu einer suboptimalen Ressourcennutzung. Im schlimmsten Fall ist es unmöglich, den Job überhaupt zu beenden.

  3. Bei der Verwendung einer deklarativen API auf hoher Ebene müssen Sie sich von den Implementierungsdetails auf niedriger Ebene isolieren. Wie bereits von @ dwysakowicz und @ RomiKuntsman erwähnt, ist eine Optimierung eine Aufgabe des Catalyst Optimizer . Es ist ein ziemlich raffiniertes Biest und ich bezweifle wirklich, dass Sie es leicht verbessern können, ohne viel tiefer in seine inneren Strukturen einzutauchen.

Verwandte konzepte

Partitionierung mit JDBC-Quellen :

JDBC-Datenquellen unterstützen predicates Argument . Es kann wie folgt verwendet werden:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Es wird eine einzelne JDBC-Partition pro Prädikat erstellt. Beachten Sie, dass in der resultierenden Tabelle Duplikate angezeigt werden, wenn mit einzelnen Prädikaten erstellte Mengen nicht disjunkt sind.

partitionBy Methode in DataFrameWriter:

Spark DataFrameWriter bietet die Methode partitionBy, mit der Daten beim Schreiben "partitioniert" werden können. Es trennt die Daten beim Schreiben anhand der bereitgestellten Spalten

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Dies aktiviert das Prädikat Push-down beim Lesen für Abfragen basierend auf dem Schlüssel:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

aber es ist nicht gleichbedeutend mit DataFrame.repartition. Insbesondere Aggregationen wie:

val cnts = df1.groupBy($"k").sum()

benötigt noch TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy Methode in DataFrameWriter (Spark> = 2.0):

bucketBy hat ähnliche Anwendungen wie partitionBy, ist jedoch nur für Tabellen verfügbar (saveAsTable). Bucketing-Informationen können zur Optimierung von Joins verwendet werden:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Mit Partitionslayout meine ich nur eine Datenverteilung. partitioned RDD hat keinen Partitionierer mehr. ** Vorausgesetzt keine frühe Projektion. Wenn die Aggregation nur eine kleine Teilmenge der Spalten abdeckt, ist wahrscheinlich überhaupt kein Gewinn zu verzeichnen.

166
zero323

In Spark <1.6 Wenn Sie ein HiveContext erstellen, nicht das einfache alte SqlContext, können Sie das HiveQLDISTRIBUTE BY colX... (stellt sicher, dass jeder von N Reduzierern nicht überlappende Bereiche von x erhält) & CLUSTER BY colX... (Abkürzung für Verteilen nach und Sortieren nach) zum Beispiel;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

Nicht sicher, wie dies mit Spark DF= api. Diese Schlüsselwörter werden im normalen SqlContext nicht unterstützt (beachten Sie, dass Sie keinen Hive-Metaspeicher benötigen) den HiveContext benutzen)

BEARBEITEN: Spark 1.6+ hat dies jetzt in der nativen DataFrame-API

11
NightWolf

Verwenden Sie den DataFrame, der zurückgegeben wird von:

yourDF.orderBy(account)

Es gibt keine explizite Möglichkeit, partitionBy für einen DataFrame zu verwenden, nur für ein PairRDD. Wenn Sie jedoch einen DataFrame sortieren, wird dies in seinem LogicalPlan verwendet, und dies hilft, wenn Sie Berechnungen für jedes Konto durchführen müssen.

Ich bin gerade auf dasselbe Problem gestoßen, nämlich einen Datenrahmen, den ich nach Konto partitionieren möchte. Ich gehe davon aus, dass Sie, wenn Sie "möchten, dass die Daten so partitioniert werden, dass sich alle Transaktionen für ein Konto in derselben Partition befinden Spark Partition)", diese für die Skalierung und Leistung, aber für Ihren Code wünschen hängt nicht davon ab (wie die Verwendung von mapPartitions() etc), oder?

7
Romi Kuntsman

Ich konnte dies mit RDD tun. Aber ich weiß nicht, ob dies für Sie eine akzeptable Lösung ist. Sobald Sie DF als RDD verfügbar haben, können Sie repartitionAndSortWithinPartitions anwenden, um eine benutzerdefinierte Neupartitionierung von Daten durchzuführen.

Hier ist ein Beispiel, das ich verwendet habe:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
5
Developer

Also, um mit einer Antwort zu beginnen:) - Das kannst du nicht

Ich bin kein Experte, aber soweit ich DataFrames verstehe, sind sie nicht gleichbedeutend mit rdd und DataFrame hat keinen Partitionierer.

Im Allgemeinen ist es die Idee von DataFrame, eine andere Abstraktionsebene bereitzustellen, die solche Probleme selbst handhabt. Die Abfragen in DataFrame werden in einen logischen Plan übersetzt, der in Operationen auf RDDs weiter umgesetzt wird. Die von Ihnen vorgeschlagene Partitionierung wird wahrscheinlich automatisch angewendet oder sollte es zumindest sein.

Wenn Sie SparkSQL nicht vertrauen, dass es eine Art optimalen Job liefert, können Sie DataFrame immer in RDD [Row] umwandeln, wie in den Kommentaren vorgeschlagen.

5