wake-up-neo.com

Verketten Sie Spalten in Apache Spark DataFrame

Wie verketten wir zwei Spalten in einem Apache Spark DataFrame? Gibt es eine Funktion in Spark SQL, die wir verwenden können?

69
Nipun

Mit Raw SQL können Sie CONCAT verwenden:

  • In Python

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
  • In Scala

    import sqlContext.implicits._
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    

Seit Spark 1.5.0 können Sie die concat-Funktion mit der DataFrame-API verwenden:

  • In Python:

    from pyspark.sql.functions import concat, col, lit
    
    df.select(concat(col("k"), lit(" "), col("v")))
    
  • In Scala:

    import org.Apache.spark.sql.functions.{concat, lit}
    
    df.select(concat($"k", lit(" "), $"v"))
    

Es gibt auch eine concat_ws-Funktion, die ein String-Trennzeichen als erstes Argument verwendet.

123
zero323

So können Sie benutzerdefinierte Benennungen vornehmen

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

gibt, 

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

erstellen Sie eine neue Spalte durch Verketten:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+
25
muon

Wenn Sie DF verwenden möchten, können Sie mithilfe einer udf eine neue Spalte hinzufügen, die auf vorhandenen Spalten basiert.

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
15
Danish Shrestha

Eine Möglichkeit, String-Spalten in Spark Scala zu verketten, ist concat.

auf Nullwerte prüfen . Wenn eine der Spalten null ist, ist das Ergebnis auch dann null, wenn eine der anderen Spalten Informationen enthält.

Verwendung von concat und withColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

Verwendung von concat und select:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

Bei beiden Ansätzen haben Sie einen NEW_COLUMN, dessen Wert eine Verkettung der Spalten ist: COL1 und COL2 aus Ihrer ursprünglichen Datenbank. 

9
Ignacio Alorre

Hier ist eine andere Möglichkeit, dies für Pyspark zu tun:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit

#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])

#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))

#Show the new data frame
personDF.show()

----------RESULT-------------------------

84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+
6
Teddy Belay

Hier ein Vorschlag, wenn Sie die Nummer oder den Namen der Spalten im Dataframe nicht kennen.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
4
wones0120

Ab Spark 2.3 ( SPARK-22771 ) unterstützt Spark SQL den Verkettungsoperator ||

Zum Beispiel;

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")
3
Krishas

In Spark 2.3.0 können Sie Folgendes tun:

spark.sql( """ select '1' || column_a from table_a """)
1
Charlie 木匠

In Java können Sie dazu mehrere Spalten verketten. Der Beispielcode soll Ihnen ein Szenario und die Verwendung zum besseren Verständnis vermitteln.

SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
                        .withColumn("concatenatedCol",
                                concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));


class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;

    public static SparkSession getInstance(SparkConf sparkConf) {
        if (instance == null) {
            instance = SparkSession.builder().config(sparkConf)
                    .getOrCreate();
        }
        return instance;
    }
}

Der obige Code verkettet col1, col2, col3 durch "_" getrennt, um eine Spalte mit dem Namen "concatenatedCol" zu erstellen.

0
wandermonk

In der Tat gibt es einige schöne integrierte Abstraktionen, mit denen Sie Ihre Verkettung durchführen können, ohne eine benutzerdefinierte Funktion implementieren zu müssen. Da Sie Spark SQL erwähnt haben, versuchen Sie vermutlich, es als deklarativen Befehl über spark.sql () zu übergeben. In diesem Fall können Sie auf einfache Weise den folgenden SQL-Befehl eingeben: SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;

Außerdem können Sie ab Spark 2.3.0 Befehle in folgenden Zeilen verwenden: SELECT col1 || col2 AS concat_column_name FROM <table_name>;

Hierbei handelt es sich um das bevorzugte Trennzeichen (kann auch ein leerer Bereich sein) und um die temporäre oder permanente Tabelle, aus der Sie lesen möchten.

0
user11768920

Eine andere Möglichkeit, dies in pySpark mit sqlContext zu tun ...

#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))
0
Gur