wake-up-neo.com

Führen Sie einen getippten Join in Scala mit Spark-Datensätzen durch

Ich mag Spark-Datasets, da sie mir zur Kompilierzeit Analyse- und Syntaxfehler liefern und ich auch mit Getigern arbeiten kann, anstatt mit fest codierten Namen/Zahlen. Die meisten Berechnungen können mit den übergeordneten APIs von Dataset durchgeführt werden. Zum Beispiel ist es viel einfacher, agg, select, sum, avg, map, filter oder groupBy -Operationen durch Zugriff auf ein vom Dataset typisiertes Objekt auszuführen, als die Datenfelder von RDD-Zeilen.

Allerdings fehlt der Join-Vorgang, ich habe gelesen, dass ich so einen Join machen kann

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

Aber das ist nicht das, was ich möchte, da ich es am liebsten über die Fallklassenschnittstelle machen würde, also etwas ähnliches

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

Die beste Alternative für jetzt scheint, ein Objekt neben der Fallklasse zu erstellen und diesen Funktionen zu geben, um mir den richtigen Spaltennamen als String zu geben. Ich würde also die erste Codezeile verwenden, aber statt eines hart codierten Spaltennamens eine Funktion verwenden. Aber das fühlt sich nicht elegant genug an ..

Kann mir hier jemand andere Optionen empfehlen? Das Ziel ist, eine Abstraktion von den tatsächlichen Spaltennamen zu erhalten und vorzugsweise über die Getter der Fallklasse zu arbeiten.

Ich verwende Spark 1.6.1 und Scala 2.10

25
Sparky

Überwachung

Spark SQL kann den Join nur optimieren, wenn die Join-Bedingung auf dem Gleichheitsoperator basiert. Dies bedeutet, dass wir Equijoins und Nicht-Equijoins separat betrachten können.

Equijoin

Equijoin kann auf typsichere Weise implementiert werden, indem sowohl Datasets ((Schlüssel, Wert)) Tupel zugeordnet wird, ein Join auf Schlüsselbasis durchgeführt wird und das Ergebnis umgeformt wird:

import org.Apache.spark.sql.Encoder
import org.Apache.spark.sql.Dataset

def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
    (f: T => K, g: U => K)
    (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
  val ds1_ = ds1.map(x => (f(x), x))
  val ds2_ = ds2.map(x => (g(x), x))
  ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}

Nicht-Equijoin

Kann mit relationalen Algebra-Operatoren als R ⋈θ S = σθ (R × S) ausgedrückt und direkt in Code umgewandelt werden.

Spark 2.0

Aktivieren Sie crossJoin und verwenden Sie joinWith mit trivial gleichem Prädikat:

spark.conf.set("spark.sql.crossJoin.enabled", true)

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                         (p: (T, U) => Boolean) = {
  ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}

Spark 2.1

Verwenden Sie die crossJoin-Methode:

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
    (p: (T, U) => Boolean)
    (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
  ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}

Beispiele

case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)

val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
  LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS

safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)

Anmerkungen

  • Es sollte beachtet werden, dass sich diese Methoden von einer direkten joinWith-Anwendung unterscheiden und teure DeserializeToObject/SerializeFromObject-Transformationen erfordern (im Vergleich zu dieser direkten joinWith können logische Operationen mit den Daten verwendet werden). 

    Dies ähnelt dem in Spark 2.0-Datensatz vs DataFrame beschriebenen Verhalten.

  • Wenn Sie nicht auf die Spark-SQL-API beschränkt sind, bietet frameless interessante Typ-sichere Erweiterungen für Datasets (derzeit unterstützt sie nur Spark 2.0):

    import frameless.TypedDataset
    
    val typedPoints1 = TypedDataset.create(points1)
    val typedPoints2 = TypedDataset.create(points2)
    
    typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
    
  • Dataset API ist in 1.6 nicht stabil, daher halte ich es nicht für sinnvoll, es dort zu verwenden.

  • Natürlich sind dieses Design und die beschreibenden Namen nicht notwendig. Sie können die Typklasse problemlos verwenden, um diese Methoden implizit zu Dataset hinzuzufügen, und es besteht kein Konflikt mit eingebauten Signaturen, sodass beide als joinWith bezeichnet werden können.

25
user6910411

Ein weiteres größeres Problem für die nicht sichere Spark-API besteht darin, dass Sie, wenn Sie zwei Datasets hinzufügen, eine DataFrame erhalten. Und dann verlieren Sie Typen von Ihren ursprünglichen zwei Datensätzen. 

val a: Dataset[A]
val b: Dataset[B]

val joined: Dataframe = a.join(b)
// what would be great is 
val joined: Dataset[C] = a.join(b)(implicit func: (A, B) => C)
0
linehrr