wake-up-neo.com

Warum schlägt die Spark-Anwendung mit "ClassNotFoundException: Fehler beim Auffinden der Datenquelle: kafka" als übergeordnetes Laufwerk mit sbt-Assembly fehl?

Ich versuche, ein Beispiel wie https://github.com/Apache/spark/blob/master/examples/src/main/scala/org/Apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala auszuführen . Ich habe mit dem Spark Structured Streaming-Programmierhandbuch unter http://spark.Apache.org/docs/latest/structured-streaming-programming-guide.html angefangen. 

Mein Code lautet

package io.boontadata.spark.job1

import org.Apache.spark.sql.SparkSession

object DirectKafkaAggregateEvents {
  val FIELD_MESSAGE_ID = 0
  val FIELD_DEVICE_ID = 1
  val FIELD_TIMESTAMP = 2
  val FIELD_CATEGORY = 3
  val FIELD_MEASURE1 = 4
  val FIELD_MEASURE2 = 5

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <subscribeType> sample value: subscribe
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    val Array(bootstrapServers, subscribeType, topics) = args

    val spark = SparkSession
      .builder
      .appName("boontadata-spark-job1")
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running Word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }

}

Ich habe folgende sbt-Dateien hinzugefügt: 

build.sbt: 

name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"

libraryDependencies += "org.Apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.Apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.Apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.Apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.Apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.Apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.Apache.kafka" % "kafka_2.11" % "0.10.1.1"

// META-INF discarding
assemblyMergeStrategy in Assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

Ich habe auch Projekt/Assembly.sbt hinzugefügt

addSbtPlugin("com.eed3si9n" % "sbt-Assembly" % "0.14.3")

Dadurch wird eine Uber-Dose mit den nicht provided-Dosen erstellt. 

Ich schicke mit der folgenden Zeile: 

spark-submit boontadata-spark-job1-Assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic

aber ich bekomme diesen Laufzeitfehler: 

Exception in thread "main" Java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.Apache.org/confluence/display/SPARK/Third+Party+Projects
        at org.Apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
        at org.Apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
        at org.Apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
        at org.Apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
        at org.Apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
        at org.Apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
        at org.Apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at org.Apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
        at Sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at Sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.Java:62)
        at Sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.Java:43)
        at Java.lang.reflect.Method.invoke(Method.Java:498)
        at org.Apache.spark.deploy.SparkSubmit$.org$Apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at org.Apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.Apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.Apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.Apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: Java.lang.ClassNotFoundException: kafka.DefaultSource
        at Java.net.URLClassLoader.findClass(URLClassLoader.Java:381)
        at Java.lang.ClassLoader.loadClass(ClassLoader.Java:424)
        at Java.lang.ClassLoader.loadClass(ClassLoader.Java:357)
        at org.Apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at org.Apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at scala.util.Try$.apply(Try.scala:192)
        at org.Apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at org.Apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at scala.util.Try.orElse(Try.scala:84)
        at org.Apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
        ... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook

Gibt es eine Möglichkeit zu wissen, welche Klasse nicht gefunden wird, damit ich das Repo von maven.org nach dieser Klasse durchsuchen kann. 

Der lookupDataSource-Quellcode scheint in Zeile 543 zu sein: https://github.com/Apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/Apache/space/sql/execational/cache/space/sql/execoration/ /DataSource.scala , aber ich konnte keine direkte Verbindung mit der Kafka-Datenquelle finden ...

Der vollständige Quellcode ist hier: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f

17
benjguin

Ich habe so versucht es funktioniert für mich. Senden Sie so und lassen Sie mich wissen, wenn Sie irgendwelche Probleme haben

./spark-submit --packages org.Apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/Apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
13

Das Problem ist der folgende Abschnitt in build.sbt:

// META-INF discarding
assemblyMergeStrategy in Assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

Es besagt, dass alle META-INF-Elemente gelöscht werden sollten, einschließlich des "Codes", durch den Datenquellen-Aliase (z. B. kafka) funktionieren.

Die META-INF-Dateien sind jedoch sehr wichtig, damit kafka (und andere Aliasnamen von Streaming-Datenquellen) funktionieren.

Damit kafka-Alias ​​funktioniert, verwendet Spark SQL META-INF/services/org.Apache.spark.sql.sources.DataSourceRegister mit dem folgenden Eintrag:

org.Apache.spark.sql.kafka010.KafkaSourceProvider

KafkaSourceProviderist für die Registrierung verantwortlichkafka-Alias ​​mit der richtigen Streaming-Datenquelle, d. h. KafkaSource .

Nur um zu überprüfen, ob der echte Code tatsächlich verfügbar ist, der "Alias", der den Alias ​​registriert, jedoch nicht, können Sie die kafka-Datenquelle anhand des vollständig qualifizierten Namens (nicht des Alias) wie folgt verwenden:

spark.readStream.
  format("org.Apache.spark.sql.kafka010.KafkaSourceProvider").
  load

Sie werden andere Probleme aufgrund fehlender Optionen wie kafka.bootstrap.servers sehen, aber ...wir schweifen ab.

Eine Lösung ist MergeStrategy.concat all META-INF/services/org.Apache.spark.sql.sources.DataSourceRegister (das würde ein Uber-Jar mit allen Datenquellen, einschließlich der kafka-Datenquelle, erzeugen).

case "META-INF/services/org.Apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
11
Jacek Laskowski

In meinem Fall habe ich auch diese Fehlermeldung erhalten, als ich mit sbt kompilierte. Die Ursache war, dass sbt Assembly das spark-sql-kafka-0-10_2.11-Artefakt nicht als Teil des Fat Jar enthielt.

(Ich würde mich sehr über Kommentare freuen. Die Abhängigkeit wurde nicht als Bereich angegeben, daher sollte nicht davon ausgegangen werden, dass sie "bereitgestellt" wird).

Also habe ich gewechselt, ein normales (schlankes) Jar zu implementieren und die Abhängigkeiten mit den --jars-Parametern in Spark-Submit aufzunehmen.

Um alle Abhängigkeiten an einem Ort zu erfassen, können Sie retrieveManaged := true zu Ihren sbt-Projekteinstellungen hinzufügen oder in der sbt-Konsole Folgendes ausgeben:

> set retrieveManaged := true
> package

Das sollte alle Abhängigkeiten in den lib_managed-Ordner bringen.

Dann können Sie alle diese Dateien kopieren (mit einem Bash-Befehl können Sie beispielsweise so etwas verwenden.)

cd /path/to/your/project

JARLIST=$(find lib_managed -name '*.jar'| paste -sd , -)

spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"
3
ssice

Ich verwende Spark 2.1 und stehe vor dem gleichen Problem Mein Workaround ist 

1) spark-Shell --packages org.Apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

2) cd ~/.ivy2/jars hier sind Sie, alle benötigten Gläser befinden sich jetzt in diesem Ordner

3) kopiere alle Gläser in diesem Ordner auf alle Knoten (kann einen bestimmten Ordner erstellen, der sie enthält)

4) fügen Sie den Ordnernamen zu spark.driver.extraClassPath und spark.driver.extraClassPath hinzu, z. spark.driver.extraClassPath=/opt/jars/*:your_other_jars

5 spark-submit --class ClassNm --Other-Options YourJar.jar funktioniert jetzt einwandfrei

1
dalin qin

Dies ist im Hinblick auf Jacek Laskowskis Antwort.

Diejenigen von Ihnen, die Ihr Projekt auf maven bauen, können dies ausprobieren. Fügen Sie die unten angegebene Zeile zu Ihrem maven-shade-plugin hinzu. 

META-INF/services/org.Apache.spark.sql.sources.DataSourceRegister

Ich habe den Plugin-Code für die POM-Datei als Beispiel notiert, um zu zeigen, wo die Zeile eingefügt werden soll.


<plugin>
    <groupId>org.Apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.1.0</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer implementation="org.Apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>
                            META-INF/services/org.Apache.spark.sql.sources.DataSourceRegister
                        </resource>
                    </transformer>
                </transformers>
                <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
        </execution>
    </executions>
</plugin>

Bitte entschuldigen Sie meine Formatierungsfähigkeiten.

0
Algomeister

Ich verwende gradle als Build-Tool und das shadowJar-Plugin, um UberJar..__ zu erstellen. Die Lösung bestand einfach darin, eine Datei hinzuzufügen 

src/main/resources/META-INF/services/org.Apache.spark.sql.sources.DataSourceRegister  

zum Projekt.

In dieser Datei müssen Sie die Klassennamen der verwendeten DataSources Zeile für Zeile angeben. In diesem Fall wäre dies org.Apache.spark.sql.kafka010.KafkaSourceProvider (finden Sie diesen Klassennamen zum Beispiel hier )

Der Grund ist, dass Spark den Java ServiceLoader in seinen internen Mechanismen zur Verwaltung von Abhängigkeiten verwendet.

Vollständiges Beispiel hier .

0
Falco Winkler

Ich habe es gelöst, indem ich die JAR-Datei in das Treibersystem heruntergeladen habe. Von dort aus lieferte ich das Glas, um mit der Option --jar zu funken.

Es ist auch anzumerken, dass ich die gesamte Spark 2.1-Umgebung in mein Über-Glas-Gefäß packte (da mein Cluster noch auf 1.6.1 ist).

spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class ClassNm --Other-Options YourJar.jar

0
Gyan