Ich mag Spark-Datasets, da sie mir zur Kompilierzeit Analyse- und Syntaxfehler liefern und ich auch mit Getigern arbeiten kann, anstatt mit fest codierten Namen/Zahlen. Die meisten Berechnungen können mit den übergeordneten APIs von Dataset durchgeführt werden. Zum Beispiel ist es viel einfacher, agg, select, sum, avg, map, filter oder groupBy -Operationen durch Zugriff auf ein vom Dataset typisiertes Objekt auszuführen, als die Datenfelder von RDD-Zeilen.
Allerdings fehlt der Join-Vorgang, ich habe gelesen, dass ich so einen Join machen kann
ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
Aber das ist nicht das, was ich möchte, da ich es am liebsten über die Fallklassenschnittstelle machen würde, also etwas ähnliches
ds1.joinWith(ds2, ds1.key === ds2.key, "inner")
Die beste Alternative für jetzt scheint, ein Objekt neben der Fallklasse zu erstellen und diesen Funktionen zu geben, um mir den richtigen Spaltennamen als String zu geben. Ich würde also die erste Codezeile verwenden, aber statt eines hart codierten Spaltennamens eine Funktion verwenden. Aber das fühlt sich nicht elegant genug an ..
Kann mir hier jemand andere Optionen empfehlen? Das Ziel ist, eine Abstraktion von den tatsächlichen Spaltennamen zu erhalten und vorzugsweise über die Getter der Fallklasse zu arbeiten.
Ich verwende Spark 1.6.1 und Scala 2.10
Spark SQL kann den Join nur optimieren, wenn die Join-Bedingung auf dem Gleichheitsoperator basiert. Dies bedeutet, dass wir Equijoins und Nicht-Equijoins separat betrachten können.
Equijoin kann auf typsichere Weise implementiert werden, indem sowohl Datasets
((Schlüssel, Wert)) Tupel zugeordnet wird, ein Join auf Schlüsselbasis durchgeführt wird und das Ergebnis umgeformt wird:
import org.Apache.spark.sql.Encoder
import org.Apache.spark.sql.Dataset
def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
(f: T => K, g: U => K)
(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
val ds1_ = ds1.map(x => (f(x), x))
val ds2_ = ds2.map(x => (g(x), x))
ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}
Kann mit relationalen Algebra-Operatoren als R ⋈θ S = σθ (R × S) ausgedrückt und direkt in Code umgewandelt werden.
Aktivieren Sie crossJoin
und verwenden Sie joinWith
mit trivial gleichem Prädikat:
spark.conf.set("spark.sql.crossJoin.enabled", true)
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean) = {
ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}
Verwenden Sie die crossJoin
-Methode:
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean)
(implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}
case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)
val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS
safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)
Es sollte beachtet werden, dass sich diese Methoden von einer direkten joinWith
-Anwendung unterscheiden und teure DeserializeToObject
/SerializeFromObject
-Transformationen erfordern (im Vergleich zu dieser direkten joinWith
können logische Operationen mit den Daten verwendet werden).
Dies ähnelt dem in Spark 2.0-Datensatz vs DataFrame beschriebenen Verhalten.
Wenn Sie nicht auf die Spark-SQL-API beschränkt sind, bietet frameless
interessante Typ-sichere Erweiterungen für Datasets
(derzeit unterstützt sie nur Spark 2.0):
import frameless.TypedDataset
val typedPoints1 = TypedDataset.create(points1)
val typedPoints2 = TypedDataset.create(points2)
typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
Dataset
API ist in 1.6 nicht stabil, daher halte ich es nicht für sinnvoll, es dort zu verwenden.
Natürlich sind dieses Design und die beschreibenden Namen nicht notwendig. Sie können die Typklasse problemlos verwenden, um diese Methoden implizit zu Dataset
hinzuzufügen, und es besteht kein Konflikt mit eingebauten Signaturen, sodass beide als joinWith
bezeichnet werden können.
Ein weiteres größeres Problem für die nicht sichere Spark-API besteht darin, dass Sie, wenn Sie zwei Datasets
hinzufügen, eine DataFrame
erhalten. Und dann verlieren Sie Typen von Ihren ursprünglichen zwei Datensätzen.
val a: Dataset[A]
val b: Dataset[B]
val joined: Dataframe = a.join(b)
// what would be great is
val joined: Dataset[C] = a.join(b)(implicit func: (A, B) => C)