wake-up-neo.com

DataFrame-Join-Optimierung - Broadcast-Hash-Join

Ich versuche effektiv, zwei DataFrames zu verbinden, von denen einer groß und der zweite etwas kleiner ist.

Gibt es eine Möglichkeit, all dieses Mischen zu vermeiden? Ich kann autoBroadCastJoinThreshold nicht festlegen, da es nur Ganzzahlen unterstützt - und die Tabelle, die ich senden möchte, ist etwas größer als die ganzzahlige Anzahl von Bytes.

Gibt es eine Möglichkeit, Broadcast zu zwingen, diese Variable zu ignorieren?

31
NNamed

Broadcast Hash Joins (ähnlich wie Map Side Join oder Map Side Combine in Mapreduce):

In SparkSQL können Sie den Typ des Joins sehen, der ausgeführt wird, indem Sie queryExecution.executedPlan Aufrufen. Wie bei Core Spark möchten Sie möglicherweise einen Broadcast-Hash-Join, wenn eine der Tabellen viel kleiner als die andere ist. Sie können Spark SQL mitteilen, dass ein gegebener DF zum Beitritt gesendet werden soll, indem Sie die Methode broadcast auf dem DataFrame aufrufen. vor dem Beitritt

Beispiel: largedataframe.join(broadcast(smalldataframe), "key")

in DWH-Begriffen, wobei ein größerer Datenrahmen wie Tatsache sein kann
smalldataframe kann sein wie dimension

Wie in meinem Lieblingsbuch (HPS) beschrieben. siehe unten zum besseren Verständnis .. enter image description here

Hinweis: Oben ist broadcast von import org.Apache.spark.sql.functions.broadcast Nicht von SparkContext

Spark verwendet automatisch den spark.sql.conf.autoBroadcastJoinThreshold, Um zu bestimmen, ob eine Tabelle gesendet werden soll.

Tipp: Siehe DataFrame.explain () -Methode

def
explain(): Unit
Prints the physical plan to the console for debugging purposes.

Gibt es eine Möglichkeit, das Ignorieren dieser Variablen zu erzwingen?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")


HINWEIS:

Ein weiteres ähnliches Out-of-Box-Note w.r.t. Bienenstock (kein Funke): Ähnliches kann mit dem Bienenstock-Hinweis MAPJOIN erreicht werden, wie unten dargestellt ...

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

Hive> set Hive.auto.convert.join=true;
Hive> set Hive.auto.convert.join.noconditionaltask.size=20971520
Hive> set Hive.auto.convert.join.noconditionaltask=true;
Hive> set Hive.auto.convert.join.use.nonstaged=true;
Hive> set Hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb

Weiterführende Literatur: Bitte beachten Sie meine Artikel über BHJ, SHJ, SMJ

67
Ram Ghadiyaram

Sie können mithilfe von left.join(broadcast(right), ...) angeben, dass ein Datenrahmen gesendet werden soll.

18
Sebastian Piu

Einstellung spark.sql.autoBroadcastJoinThreshold = -1 deaktiviert die Übertragung vollständig. Siehe Weitere Konfigurationsoptionen in Spark SQL, DataFrames and Datasets Guide .

4
Vishal Gupta

Dies ist eine Strombegrenzung des Funkens, siehe SPARK-6235 . Das Limit von 2 GB gilt auch für Broadcast-Variablen.

Sind Sie sicher, dass es keine andere Möglichkeit gibt, dies zu tun, z. andere Aufteilung?

Andernfalls können Sie es umgehen, indem Sie manuell mehrere Broadcast-Variablen mit jeweils <2 GB erstellen.

3
dpeacock

Ich fand diesen Code funktioniert für Broadcast Join in Spark 2.11 Version 2.0.0.

import org.Apache.spark.sql.functions.broadcast  

val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF

// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))

import context.implicits._

employeesDF.join(broadcast(tmpDepartments), 
   $"depId" === $"id",  // join by employees.depID == departments.id 
   "inner").show()

Hier ist die Referenz für den obigen Code Henning Kropp Blog, Broadcast Join with Spark

1
SparkleGoat