wake-up-neo.com

Wie werden kategoriale Funktionen mit spark-ml behandelt?

Wie gehe ich mit kategorialen Daten mit spark-ml und nicht mit spark-mllib um?

Obwohl die Dokumentation nicht sehr klar ist, scheinen Klassifizierer, z. RandomForestClassifier, LogisticRegression haben ein featuresCol-Argument, das den Namen der Funktionsspalte in DataFrame angibt, und ein labelCol-Argument, das den Namen der Spalte der markierten Klassen in DataFrame angibt.

Offensichtlich möchte ich mehr als ein Merkmal in meiner Vorhersage verwenden, also habe ich versucht, die VectorAssembler zu verwenden, um alle meine Merkmale in einem einzigen Vektor unter featuresCol zu setzen. 

Die Variable VectorAssembler akzeptiert jedoch nur numerische Typen, einen booleschen Typ und einen Vektortyp (entsprechend der Spark-Website). Ich kann also keine Zeichenfolgen in meinen Merkmalsvektor einfügen.

Wie soll ich vorgehen? 

30
Rainmaker

Ich wollte nur Holdens Antwort vervollständigen.

Seit Spark 2.3.0 ist OneHotEncoder veraltet und wird in 3.0.0 entfernt. Bitte verwenden Sie stattdessen OneHotEncoderEstimator.

In Scala:

import org.Apache.spark.ml.Pipeline
import org.Apache.spark.ml.feature.{OneHotEncoderEstimator, StringIndexer}

val df = Seq((0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)).toDF("id", "category1", "category2")

val indexer = new StringIndexer().setInputCol("category1").setOutputCol("category1Index")
val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array(indexer.getOutputCol, "category2"))
  .setOutputCols(Array("category1Vec", "category2Vec"))

val pipeline = new Pipeline().setStages(Array(indexer, encoder))

pipeline.fit(df).transform(df).show
// +---+---------+---------+--------------+-------------+-------------+
// | id|category1|category2|category1Index| category1Vec| category2Vec|
// +---+---------+---------+--------------+-------------+-------------+
// |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
// |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
// |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// +---+---------+---------+--------------+-------------+-------------+

In Python:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

df = spark.createDataFrame([(0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)], ["id", "category1", "category2"])

indexer = StringIndexer(inputCol="category1", outputCol="category1Index")
inputs = [indexer.getOutputCol(), "category2"]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["categoryVec1", "categoryVec2"])
pipeline = Pipeline(stages=[indexer, encoder])
pipeline.fit(df).transform(df).show()
# +---+---------+---------+--------------+-------------+-------------+
# | id|category1|category2|category1Index| categoryVec1| categoryVec2|
# +---+---------+---------+--------------+-------------+-------------+
# |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
# |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
# |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# +---+---------+---------+--------------+-------------+-------------+

Seit Spark 1.4.0 liefert MLLib auch das Feature OneHotEncoder , das eine Spalte mit Labelindizes einer Spalte von binären Vektoren mit höchstens einem einzigen Wert zuordnet. 

Durch diese Kodierung können Algorithmen, die kontinuierliche Merkmale wie die logistische Regression erwarten, kategoriale Funktionen verwenden

Betrachten wir die folgende DataFrame:

val df = Seq((0, "a"),(1, "b"),(2, "c"),(3, "a"),(4, "a"),(5, "c"))
            .toDF("id", "category")

Der erste Schritt wäre das Erstellen der indizierten DataFrame mit der StringIndexer:

import org.Apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
                   .setInputCol("category")
                   .setOutputCol("categoryIndex")
                   .fit(df)

val indexed = indexer.transform(df)

indexed.show
// +---+--------+-------------+                                                    
// | id|category|categoryIndex|
// +---+--------+-------------+
// |  0|       a|          0.0|
// |  1|       b|          2.0|
// |  2|       c|          1.0|
// |  3|       a|          0.0|
// |  4|       a|          0.0|
// |  5|       c|          1.0|
// +---+--------+-------------+

Sie können die categoryIndex dann mit OneHotEncoder kodieren:

import org.Apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
                   .setInputCol("categoryIndex")
                   .setOutputCol("categoryVec")

val encoded = encoder.transform(indexed)

encoded.select("id", "categoryVec").show
// +---+-------------+
// | id|  categoryVec|
// +---+-------------+
// |  0|(2,[0],[1.0])|
// |  1|    (2,[],[])|
// |  2|(2,[1],[1.0])|
// |  3|(2,[0],[1.0])|
// |  4|(2,[0],[1.0])|
// |  5|(2,[1],[1.0])|
// +---+-------------+
36
eliasah

Ich werde eine Antwort aus einer anderen Perspektive geben, da ich mich auch nach kategorialen Merkmalen in Bezug auf baumbasierte Modelle in Spark ML (nicht MLlib) gewundert habe und die Dokumentation nicht so klar ist, wie alles funktioniert. 

Wenn Sie eine Spalte in Ihrem Datenrahmen mit pyspark.ml.feature.StringIndexer umwandeln, werden zusätzliche Metadaten in dem Datenrahmen gespeichert, der das umgesetzte Feature speziell als kategoriales Feature kennzeichnet. 

Beim Drucken des Datenrahmens wird ein numerischer Wert angezeigt (dies ist ein Index, der einem Ihrer kategorialen Werte entspricht). Wenn Sie sich das Schema anschauen, sehen Sie, dass Ihre neue transformierte Spalte den Typ double hat. Diese neue Spalte, die Sie mit pyspark.ml.feature.StringIndexer.transform erstellt haben, ist jedoch nicht nur eine normale Doppelspalte, sondern es sind zusätzliche Metadaten zugeordnet, die sehr wichtig sind. Sie können diese Metadaten überprüfen, indem Sie die metadata-Eigenschaft des entsprechenden Felds im Schema Ihres Dataframes betrachten (Sie können auf die Schemaobjekte Ihres Dataframes zugreifen, indem Sie in yourdataframe.schema nachschauen.)

Diese zusätzlichen Metadaten haben zwei wichtige Auswirkungen: 

  1. Wenn Sie .fit() aufrufen, wenn Sie ein baumbasiertes Modell verwenden, scannt es die Metadaten Ihres Datenrahmens und erkennt Felder, die Sie als kodiert kodiert haben, mit Transformern wie pyspark.ml.feature.StringIndexer (wie oben erwähnt, gibt es auch andere Transformatoren, die diesen Effekt wie z pyspark.ml.feature.VectorIndexer). Aus diesem Grund müssen Sie Ihre Features NICHT mit einem Hot-Code codieren, nachdem Sie sie mit StringIndxer konvertiert haben, wenn Sie baumbasierte Modelle in Spark ML verwenden. Sie müssen jedoch weiterhin One-Hot-Codierungen durchführen, wenn Sie andere Modelle verwenden, die dies nicht tun natürlich mit Kategorien umgehen wie lineare Regression usw.). 

  2. Da diese Metadaten im Datenrahmen gespeichert sind, können Sie pyspark.ml.feature.IndexToString verwenden, um die numerischen Indizes jederzeit auf die ursprünglichen kategorialen Werte (häufig Strings) umzukehren. 

21
hamel

Es gibt eine Komponente der ML-Pipeline namens StringIndexer, mit der Sie Ihre Strings auf vernünftige Weise in Double konvertieren können. http://spark.Apache.org/docs/latest/api/scala/index.html#org.Apache.spark.ml.feature.StringIndexer hat mehr Dokumentation und http: // spark. Apache.org/docs/latest/ml-guide.html zeigt, wie Pipelines erstellt werden.

5
Holden

Ich verwende die folgende Methode für oneHotEncoding einer einzelnen Spalte in einem Spark -Datenrahmen:

def ohcOneColumn(df, colName, debug=False):

  colsToFillNa = []

  if debug: print("Entering method ohcOneColumn")
  countUnique = df.groupBy(colName).count().count()
  if debug: print(countUnique)

  collectOnce = df.select(colName).distinct().collect()
  for uniqueValIndex in range(countUnique):
    uniqueVal = collectOnce[uniqueValIndex][0]
    if debug: print(uniqueVal)
    newColName = str(colName) + '_' + str(uniqueVal) + '_TF'
    df = df.withColumn(newColName, df[colName]==uniqueVal)
    colsToFillNa.append(newColName)
  df = df.drop(colName)
  df = df.na.fill(False, subset=colsToFillNa)
  return df

Ich verwende die folgende Methode für oneHotEncoding Spark dataFrames:

from pyspark.sql.functions import col, countDistinct, approxCountDistinct
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator

def detectAndLabelCat(sparkDf, minValCount=5, debug=False, excludeCols=['Target']):
  if debug: print("Entering method detectAndLabelCat")
  newDf = sparkDf
  colList = sparkDf.columns

  for colName in sparkDf.columns:
    uniqueVals = sparkDf.groupBy(colName).count()
    if debug: print(uniqueVals)
    countUnique = uniqueVals.count()
    dtype = str(sparkDf.schema[colName].dataType)
    #dtype = str(df.schema[nc].dataType)
    if (colName in excludeCols):
      if debug: print(str(colName) + ' is in the excluded columns list.')

    Elif countUnique == 1:
      newDf = newDf.drop(colName)
      if debug:
        print('dropping column ' + str(colName) + ' because it only contains one unique value.')
      #end if debug
    #Elif (1==2):
    Elif ((countUnique < minValCount) | (dtype=="String") | (dtype=="StringType")):
      if debug: 
        print(len(newDf.columns))
        oldColumns = newDf.columns
      newDf = ohcOneColumn(newDf, colName, debug=debug)
      if debug: 
        print(len(newDf.columns))
        newColumns = set(newDf.columns) - set(oldColumns)
        print('Adding:')
        print(newColumns)
        for newColumn in newColumns:
          if newColumn in newDf.columns:
            try:
              newUniqueValCount = newDf.groupBy(newColumn).count().count()
              print("There are " + str(newUniqueValCount) + " unique values in " + str(newColumn))
            except:
              print('Uncaught error discussing ' + str(newColumn))
          #else:
          #  newColumns.remove(newColumn)

        print('Dropping:')
        print(set(oldColumns) - set(newDf.columns))

    else:
      if debug: print('Nothing done for column ' + str(colName))

      #end if countUnique == 1, Elif countUnique other condition
    #end outer for
  return newDf
0
Jim

Sie können einen String - Spaltentyp in einem Funken-Datenrahmen in einen numerischen Datentyp umwandeln, indem Sie die Cast-Funktion verwenden.

from pyspark.sql import SQLContext
from pyspark.sql.types import DoubleType, IntegerType

sqlContext = SQLContext(sc)
dataset = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('./data/titanic.csv')   

dataset = dataset.withColumn("Age", dataset["Age"].cast(DoubleType()))
dataset = dataset.withColumn("Survived", dataset["Survived"].cast(IntegerType()))

Im obigen Beispiel lesen wir eine CSV-Datei als Datenrahmen ein, wandeln die Standard-String-Datentypen in Ganzzahl und Doppelte um und überschreiben den ursprünglichen Datenrahmen. Wir können dann den VectorAssembler verwenden, um die Features in einem einzigen Vektor zusammenzuführen und Ihren bevorzugten Spark-ML-Algorithmus anzuwenden.

0
Vadim Smolyakov