Ich teste auf Funke mit Scala. Wir lesen normalerweise Json-Dateien, die wie im folgenden Beispiel bearbeitet werden müssen:
test.json:
{"a":1,"b":[2,3]}
val test = sqlContext.read.json("test.json")
Wie kann ich es in folgendes Format konvertieren:
{"a":1,"b":2}
{"a":1,"b":3}
Sie können die explode
-Funktion verwenden:
scala> import org.Apache.spark.sql.functions.explode
import org.Apache.spark.sql.functions.explode
scala> val test = sqlContext.read.json(sc.parallelize(Seq("""{"a":1,"b":[2,3]}""")))
test: org.Apache.spark.sql.DataFrame = [a: bigint, b: array<bigint>]
scala> test.printSchema
root
|-- a: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: long (containsNull = true)
scala> val flattened = test.withColumn("b", explode($"b"))
flattened: org.Apache.spark.sql.DataFrame = [a: bigint, b: bigint]
scala> flattened.printSchema
root
|-- a: long (nullable = true)
|-- b: long (nullable = true)
scala> flattened.show
+---+---+
| a| b|
+---+---+
| 1| 2|
| 1| 3|
+---+---+
explodieren wird häufig vorgeschlagen, aber es ist von der untypisierten DataFrame-API, und wenn Sie Dataset verwenden, denke ich, dass der flatMap-Operator möglicherweise besser geeignet ist (siehe org.Apache.spark.sql.Dataset ).
flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
(Scala-spezifisch) Gibt ein neues Dataset zurück, indem zuerst eine Funktion auf .__ angewendet wird. alle Elemente dieses Datasets und reduzieren dann die Ergebnisse.
Sie könnten es wie folgt verwenden:
val ds = Seq(
(0, "Lorem ipsum dolor", 1.0, Array("prp1", "prp2", "prp3")))
.toDF("id", "text", "value", "properties")
.as[(Integer, String, Double, scala.List[String])]
scala> ds.flatMap { t =>
t._4.map { prp =>
(t._1, t._2, t._3, prp) }}.show
+---+-----------------+---+----+
| _1| _2| _3| _4|
+---+-----------------+---+----+
| 0|Lorem ipsum dolor|1.0|prp1|
| 0|Lorem ipsum dolor|1.0|prp2|
| 0|Lorem ipsum dolor|1.0|prp3|
+---+-----------------+---+----+
// or just using for-comprehension
for {
t <- ds
prp <- t._4
} yield (t._1, t._2, t._3, prp)