wake-up-neo.com

Aggregatfunktion Zähle die Verwendung mit groupBy in Spark

Ich versuche, in pySpark mehrere Operationen in einer Codezeile auszuführen, und bin mir nicht sicher, ob dies in meinem Fall möglich ist.

Ich möchte die Ausgabe nicht als neuen Datenrahmen speichern.

Mein aktueller Code ist ziemlich einfach:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)

Und meine Absicht ist es, count() nach der Verwendung von groupBy hinzuzufügen, um die Anzahl der Datensätze zu ermitteln, die mit jedem Wert der Spalte timePeriod übereinstimmen. gedruckt\als Ausgabe angezeigt.

Beim Versuch, groupBy(..).count().agg(..) zu verwenden, erhalte ich Ausnahmen.

Gibt es eine Möglichkeit, die Ausdrucke count() und agg(). Show () zu erzielen, ohne den Code in zwei Befehlszeilen aufzuteilen, z. :

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()

Oder noch besser, um eine zusammengeführte Ausgabe in agg.show() output umzuwandeln - Eine zusätzliche Spalte, die die gezählte Anzahl von Datensätzen angibt, die mit dem Wert der Zeile übereinstimmen. z.B.:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315
29
Adiel

count() kann in agg() verwendet werden, da groupBy derselbe Ausdruck ist.

Mit Python

import pyspark.sql.functions as func

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
  .groupBy("timePeriod")
  .agg(
     func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
     func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     func.count(func.lit(1)).alias("Num Of Records")
   )
  .show(20, False)

pySpark SQL-Funktionen doc

Mit Scala

import org.Apache.spark.sql.functions._ //for count()

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

count(1) zählt die Datensätze nach der ersten Spalte, die gleich count("timePeriod") ist

Mit Java

import static org.Apache.spark.sql.functions.*;

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)
52
mrsrinivas