wake-up-neo.com

Speichern Sie Spark-Datenrahmen als dynamische partitionierte Tabelle in Hive

Ich habe eine Beispielanwendung, die zum Lesen von CSV-Dateien in einen Datenrahmen arbeitet. Das Datenframe kann mit der Methode df.saveAsTable(tablename,mode) im Parkettformat in einer Hive-Tabelle gespeichert werden. 

Der obige Code funktioniert gut, aber ich habe so viele Daten für jeden Tag, dass ich die Hive-Tabelle basierend auf dem Erstellungsdatum (Spalte in der Tabelle) dynamisch partitionieren möchte.

gibt es eine Möglichkeit, den Datenrahmen dynamisch zu partitionieren und im Hive-Lager zu speichern. Möchten Sie die Einfügeanweisung nicht mit hivesqlcontext.sql(insert into table partittioin by(date)....) hartcodieren.

Die Frage kann als eine Erweiterung für Folgendes angesehen werden: Wie kann ich DataFrame direkt in Hive speichern?

jede Hilfe wird sehr geschätzt.

23
Chetandalal

Ich glaube es funktioniert ungefähr so:

df ist ein Datenrahmen mit Jahr, Monat und anderen Spalten

df.write.partitionBy('year', 'month').saveAsTable(...)

oder

df.write.partitionBy('year', 'month').insertInto(...)
20
mdurant

Ich konnte mit df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table") in eine partitionierte Hive-Tabelle schreiben.

Ich musste die folgenden Eigenschaften aktivieren, damit es funktioniert.

 hiveContext.setConf ("Hive.exec.dynamic.partition", "true") 
 hiveContext.setConf ("Hive.exec.dynamic.partition.mode", "nonstrict") 
29
Jins George

Ich sah mich auch der gleichen Sache gegenüber, aber mit den folgenden Tricks löste ich mich auf.

  1. Wenn eine Tabelle als partitioniert ausgeführt wird, dann wird bei der partitionierten Spalte die Groß- und Kleinschreibung berücksichtigt.

  2. Partitionierte Spalten sollten in DataFrame mit demselben Namen vorhanden sein (Groß- und Kleinschreibung beachten). Code:

    var dbName="your database name"
    var finaltable="your table name"
    
    // First check if table is available or not..
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
         //If table is not available then it will create for you..
         println("Table Not Present \n  Creating table " + finaltable)
         sparkSession.sql("use Database_Name")
         sparkSession.sql("SET Hive.exec.dynamic.partition = true")
         sparkSession.sql("SET Hive.exec.dynamic.partition.mode = nonstrict ")
         sparkSession.sql("SET Hive.exec.max.dynamic.partitions.pernode = 400")
         sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
         //Table is created now insert the DataFrame in append Mode
         df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
    }
    
6
Nilesh Shinde

Das funktioniert bei mir. Ich setze diese Einstellungen und füge die Daten dann in partitionierten Tabellen ein.

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("Hive.exec.dynamic.partition", "true")
sqlContext.setConf("Hive.exec.dynamic.partition.mode", 
"nonstrict")
0
Shaunak Bangale

Dies funktionierte für mich mit Python und Spark 2.1.0.

Nicht sicher, ob dies der beste Weg ist, aber es funktioniert ...

# WRITE DATA INTO A Hive TABLE
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("Hive.exec.dynamic.partition", "true") \
    .config("Hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

### CREATE Hive TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS Hive_df (col1 INT, col2 STRING, partition_bin INT)
USING Hive OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'Hive_df'
""")
spark.sql("""
INSERT INTO Hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### CREATE NON Hive TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_Hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_Hive_df'
""")
spark.sql("""
INSERT INTO non_Hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE Hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_Hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")

spark.sql("SELECT * FROM Hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_Hive_df").show() # 1 row full table overwrite
0
isichei