Wie kann ich eine RDD mit komplexen Typen wie Maps/Arrays abfragen? Zum Beispiel, als ich diesen Testcode schrieb:
case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
Ich denke, die Syntax wäre ungefähr so:
sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
oder
sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
aber ich verstehe
Kein geschachteltes Feld vom Typ MapType (StringType, StringType, true)
und
org.Apache.spark.sql.catalyst.errors.package $ TreeNodeException: Nicht aufgelöste Attribute
beziehungsweise.
Das hängt von der Art der Spalte ab. Beginnen wir mit einigen Dummy-Daten:
import org.Apache.spark.sql.functions.{udf, lit}
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
array (ArrayType
) Spalten:
Column.getItem
Methode
df.select($"an_array".getItem(1)).show
// +-----------+
// |an_array[1]|
// +-----------+
// | 2|
// | 5|
// +-----------+
Hive-Klammer-Syntax:
sqlContext.sql("SELECT an_array[1] FROM df").show
// +---+
// |_c0|
// +---+
// | 2|
// | 5|
// +---+
eine UDF
val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)
df.select(get_ith($"an_array", lit(1))).show
// +---------------+
// |UDF(an_array,1)|
// +---------------+
// | 2|
// | 5|
// +---------------+
Zusätzlich zu den oben aufgeführten Methoden unterstützt Spark eine wachsende Liste integrierter Funktionen, die mit komplexen Typen arbeiten. Zu den bemerkenswerten Beispielen gehören Funktionen höherer Ordnung wie transform
(nur SQL, 2.4+):
df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show
// +------------+
// |an_array_inc|
// +------------+
// | [2, 3, 4]|
// | [5, 6, 7]|
// +------------+
filter
(nur SQL, 2.4+)
df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show
// +-------------+
// |an_array_even|
// +-------------+
// | [2]|
// | [4, 6]|
// +-------------+
aggregate
(nur SQL, 2.4+):
df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show
// +------------+
// |an_array_sum|
// +------------+
// | 6|
// | 15|
// +------------+
array-Verarbeitungsfunktionen (array_*
) wie array_distinct
(2.4+):
import org.Apache.spark.sql.functions.array_distinct
df.select(array_distinct($"an_array_of_structs.vals"(0))).show
// +-------------------------------------------+
// |array_distinct(an_array_of_structs.vals[0])|
// +-------------------------------------------+
// | [1.0, 2.0]|
// | [5.0, 6.0]|
// +-------------------------------------------+
array_max
(array_min
, 2.4+):
import org.Apache.spark.sql.functions.array_max
df.select(array_max($"an_array")).show
// +-------------------+
// |array_max(an_array)|
// +-------------------+
// | 3|
// | 6|
// +-------------------+
flatten
(2.4+)
import org.Apache.spark.sql.functions.flatten
df.select(flatten($"an_array_of_structs.vals")).show
// +---------------------------------+
// |flatten(an_array_of_structs.vals)|
// +---------------------------------+
// | [1.0, 2.0, 2.0, 3...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------+
arrays_Zip
(2.4+):
import org.Apache.spark.sql.functions.arrays_Zip
df.select(arrays_Zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false)
// +--------------------------------------------------------------------+
// |arrays_Zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +--------------------------------------------------------------------+
// |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] |
// |[[5.0, 7.0], [6.0, 8.0]] |
// +--------------------------------------------------------------------+
array_union
(2.4+):
import org.Apache.spark.sql.functions.array_union
df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show
// +---------------------------------------------------------------------+
// |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])|
// +---------------------------------------------------------------------+
// | [1.0, 2.0, 3.0, 4...|
// | [5.0, 6.0, 7.0, 8.0]|
// +---------------------------------------------------------------------+
slice
(2.4+):
import org.Apache.spark.sql.functions.slice
df.select(slice($"an_array", 2, 2)).show
// +---------------------+
// |slice(an_array, 2, 2)|
// +---------------------+
// | [2, 3]|
// | [5, 6]|
// +---------------------+
zuordnungsspalten (MapType
)
mit Column.getField
Methode:
df.select($"a_map".getField("foo")).show
// +----------+
// |a_map[foo]|
// +----------+
// | bar|
// | null|
// +----------+
verwenden der Syntax für Hive-Klammern:
sqlContext.sql("SELECT a_map['foz'] FROM df").show
// +----+
// | _c0|
// +----+
// |null|
// | baz|
// +----+
verwenden eines vollständigen Pfads mit Punktsyntax:
df.select($"a_map.foo").show
// +----+
// | foo|
// +----+
// | bar|
// |null|
// +----+
mit einer UDF
val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))
df.select(get_field($"a_map", lit("foo"))).show
// +--------------+
// |UDF(a_map,foo)|
// +--------------+
// | bar|
// | null|
// +--------------+
Wachsende Anzahl von map_*
-Funktionen wie map_keys
(2.3+)
import org.Apache.spark.sql.functions.map_keys
df.select(map_keys($"a_map")).show
// +---------------+
// |map_keys(a_map)|
// +---------------+
// | [foo]|
// | [foz]|
// +---------------+
oder map_values
(2.3+)
import org.Apache.spark.sql.functions.map_values
df.select(map_values($"a_map")).show
// +-----------------+
// |map_values(a_map)|
// +-----------------+
// | [bar]|
// | [baz]|
// +-----------------+
Bitte überprüfen Sie SPARK-23899 für eine detaillierte Liste.
struct (StructType
) Spalten mit vollständigem Pfad und Punktsyntax:
mit der DataFrame-API
df.select($"a_struct.x").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
mit rohem SQL
sqlContext.sql("SELECT a_struct.x FROM df").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
auf Felder innerhalb des Arrays von structs
kann mit Punktsyntax, Namen und Standard Column
-Methoden zugegriffen werden:
df.select($"an_array_of_structs.foo").show
// +----------+
// | foo|
// +----------+
// |[foo, bar]|
// |[foz, baz]|
// +----------+
sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+
df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// | 4.0|
// | 8.0|
// +------------------------------+
auf benutzerdefinierte Felder (UDTs) kann über UDFs zugegriffen werden. Weitere Informationen finden Sie unter SparkSQL-Referenzierungsattribute von UDT .
Anmerkungen:
HiveContext
verfügbar sein. UDFs sollten unabhängig von der Version mit dem Standard SQLContext
und HiveContext
funktionieren.im Allgemeinen sind verschachtelte Werte Bürger zweiter Klasse. Nicht alle typischen Operationen werden für verschachtelte Felder unterstützt. Je nach Kontext kann es besser sein, das Schema zu glätten und/oder Sammlungen aufzulösen
df.select(explode($"an_array_of_structs")).show
// +--------------------+
// | col|
// +--------------------+
// |[foo,1,WrappedArr...|
// |[bar,2,WrappedArr...|
// |[foz,3,WrappedArr...|
// |[baz,4,WrappedArr...|
// +--------------------+
Die Punktsyntax kann mit einem Platzhalterzeichen (*
) kombiniert werden, um (möglicherweise mehrere) Felder auszuwählen, ohne die Namen explizit anzugeben:
df.select($"a_struct.*").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
JSON-Spalten können mit den Funktionen get_json_object
und from_json
abgefragt werden. Siehe Wie kann ich eine JSON-Datenspalte mit Spark DataFrames abfragen? für Details.
Sobald Sie es in DF konvertieren, können Sie Daten einfach als abrufen
val rddRow= rdd.map(kv=>{
val k = kv._1
val v = kv._2
Row(k, v)
})
val myFld1 = StructField("name", org.Apache.spark.sql.types.StringType, true)
val myFld2 = StructField("map", org.Apache.spark.sql.types.MapType(StringType, StringType), true)
val arr = Array( myFld1, myFld2)
val schema = StructType( arr )
val rowrddDF = sqc.createDataFrame(rddRow, schema)
rowrddDF.registerTempTable("rowtbl")
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
or
val rowrddDFFinal = rowrddDF.select("map.one")
hier war was ich getan habe und es hat funktioniert
case class Test(name: String, m: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
val rdddf = rdd.toDF
rdddf.registerTempTable("mytable")
sqlContext.sql("select m.hello from mytable").show
Ergebnisse
+------+
| hello|
+------+
| world|
|people|
+------+