wake-up-neo.com

Filtern von DataFrame anhand der Länge einer Spalte

Ich möchte eine DataFrame mit einer Bedingung filtern, die sich auf die Länge einer Spalte bezieht. Diese Frage ist möglicherweise sehr einfach, aber ich habe keine verwandte Frage in der SO gefunden. 

Genauer gesagt, ich habe eine DataFrame mit nur einer Column welche von ArrayType(StringType()), ich möchte die DataFrame filtern.

df = sqlContext.read.parquet("letters.parquet")
df.show()

# The output will be 
# +------------+
# |      tokens|
# +------------+
# |[L, S, Y, S]|
# |[L, V, I, S]|
# |[I, A, N, A]|
# |[I, L, S, A]|
# |[E, N, N, Y]|
# |[E, I, M, A]|
# |[O, A, N, A]|
# |   [S, U, S]|
# +------------+

# But I want only the entries with length 3 or less
fdf = df.filter(len(df.tokens) <= 3)
fdf.show() # But it says that the TypeError: object of type 'Column' has no len(), so the previous statement is obviously incorrect.

Ich habe Column's Documentation gelesen, aber keine nützliche Eigenschaft für diese Angelegenheit gefunden. Ich freue mich über jede Hilfe! 

26

In Spark> = 1.5 können Sie die Funktion size verwenden:

from pyspark.sql.functions import col, size

df = sqlContext.createDataFrame([
    (["L", "S", "Y", "S"],  ),
    (["L", "V", "I", "S"],  ),
    (["I", "A", "N", "A"],  ),
    (["I", "L", "S", "A"],  ),
    (["E", "N", "N", "Y"],  ),
    (["E", "I", "M", "A"],  ),
    (["O", "A", "N", "A"],  ),
    (["S", "U", "S"],  )], 
    ("tokens", ))

df.where(size(col("tokens")) <= 3).show()

## +---------+
## |   tokens|
## +---------+
## |[S, U, S]|
## +---------+

In Spark <1.5 sollte eine UDF den Trick tun:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

size_ = udf(lambda xs: len(xs), IntegerType())

df.where(size_(col("tokens")) <= 3).show()

## +---------+
## |   tokens|
## +---------+
## |[S, U, S]|
## +---------+

Wenn Sie HiveContext verwenden, sollte size UDF mit Raw SQL mit jeder Version funktionieren:

df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df WHERE size(tokens) <= 3").show()

## +--------------------+
## |              tokens|
## +--------------------+
## |ArrayBuffer(S, U, S)|
## +--------------------+

Für String-Spalten können Sie entweder eine udf-Funktion oder eine length-Funktion verwenden:

from pyspark.sql.functions import length

df = sqlContext.createDataFrame([("fooo", ), ("bar", )], ("k", ))
df.where(length(col("k")) <= 3).show()

## +---+
## |  k|
## +---+
## |bar|
## +---+
48
zero323

Hier ist ein Beispiel für String in Scala:

val stringData = Seq(("Maheswara"), ("Mokshith"))
val df = sc.parallelize(stringData).toDF
df.where((length($"value")) <= 8).show
+--------+
|   value|
+--------+
|Mokshith|
+--------+
df.withColumn("length", length($"value")).show
+---------+------+
|    value|length|
+---------+------+
|Maheswara|     9|
| Mokshith|     8|
+---------+------+
1
mputha

@ AlbertoBonsanto: unterhalb der Codefilter basierend auf der Arraygröße:

val input = Seq(("a1,a2,a3,a4,a5"), ("a1,a2,a3,a4"), ("a1,a2,a3"), ("a1,a2"), ("a1"))
val df = sc.parallelize(input).toDF("tokens")
val tokensArrayDf = df.withColumn("tokens", split($"tokens", ","))
tokensArrayDf.show
+--------------------+
|              tokens|
+--------------------+
|[a1, a2, a3, a4, a5]|
|    [a1, a2, a3, a4]|
|        [a1, a2, a3]|
|            [a1, a2]|
|                [a1]|
+--------------------+

tokensArrayDf.filter(size($"tokens") > 3).show
+--------------------+
|              tokens|
+--------------------+
|[a1, a2, a3, a4, a5]|
|    [a1, a2, a3, a4]|
+--------------------+
0
mputha