wake-up-neo.com

Wie teste ich PySpark-Programme?

Mein aktueller Java/Spark-Unit-Test-Ansatz funktioniert (detailliert hier ), indem ein SparkContext mit "local" instanziiert und Unit-Tests mit JUnit ausgeführt werden.

Der Code muss so organisiert sein, dass er E/A in einer Funktion ausführt und eine andere mit mehreren RDDs aufruft.

Das funktioniert super. Ich habe eine sehr getestete Datenumwandlung in Java + Spark.

Kann ich dasselbe mit Python machen?

Wie würde ich Spark Unit-Tests mit Python ausführen?

30

Ich würde auch die Verwendung von py.test empfehlen. py.test macht es einfach, wiederverwendbare SparkContext-Test-Fixtures zu erstellen und damit prägnante Testfunktionen zu schreiben. Sie können Fixtures auch spezialisieren (zum Beispiel um einen StreamingContext zu erstellen) und einen oder mehrere davon in Ihren Tests verwenden.

Ich habe zu diesem Thema einen Blogbeitrag über Medium geschrieben:

https://engblog.nextdoor.com/unit-testing-Apache-spark-with-py-test-3b8970dc013b

Hier ist ein Ausschnitt aus dem Beitrag:

pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_Word_counts(spark_context):
    """ test Word couting
    Args:
       spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_Word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results
24
Vikas Kawadia

Hier ist eine Lösung mit pytest, wenn Sie Spark 2.x und SparkSession verwenden. Ich importiere auch ein Paket eines Drittanbieters.

import logging

import pytest
from pyspark.sql import SparkSession

def quiet_py4j():
    """Suppress spark logging for the test context."""
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)


@pytest.fixture(scope="session")
def spark_session(request):
    """Fixture for creating a spark context."""

    spark = (SparkSession
             .builder
             .master('local[2]')
             .config('spark.jars.packages', 'com.databricks:spark-avro_2.11:3.0.1')
             .appName('pytest-pyspark-local-testing')
             .enableHiveSupport()
             .getOrCreate())
    request.addfinalizer(lambda: spark.stop())

    quiet_py4j()
    return spark


def test_my_app(spark_session):
   ...

Beachten Sie, dass ich bei Verwendung von Python 3 Folgendes als PYSPARK_PYTHON-Umgebungsvariable angeben musste:

import os
import sys

IS_PY2 = sys.version_info < (3,)

if not IS_PY2:
    os.environ['PYSPARK_PYTHON'] = 'python3'

Andernfalls erhalten Sie den Fehler:

Ausnahme: Python in Worker hat eine andere Version 2.7 als die in Treiber 3.5, PySpark kann nicht mit verschiedenen Nebenversionen ausgeführt werden. Bitte überprüfen Sie, ob die Umgebungsvariablen PYSPARK_PYTHON und PYSPARK_DRIVER_PYTHON korrekt eingestellt sind.

15
ksindi

Angenommen, Sie haben pyspark installiert, können Sie die folgende Klasse für unitTest in unittest verwenden:

import unittest
import pyspark


class PySparkTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing")
        cls.sc = pyspark.SparkContext(conf=conf)
        cls.spark = pyspark.SQLContext(cls.sc)

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()

Beispiel:

class SimpleTestCase(PySparkTestCase):

    def test_with_rdd(self):
        test_input = [
            ' hello spark ',
            ' hello again spark spark'
        ]

        input_rdd = self.sc.parallelize(test_input, 1)

        from operator import add

        results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
        self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])

    def test_with_df(self):
        df = self.spark.createDataFrame(data=[[1, 'a'], [2, 'b']], 
                                        schema=['c1', 'c2'])
        self.assertEqual(df.count(), 2)

Beachten Sie, dass dadurch ein Kontext pro Klasse erstellt wird. Verwenden Sie setUp anstelle von setUpClass, um einen Kontext pro Test zu erhalten. Dies erhöht in der Regel den Zeitaufwand für die Ausführung der Tests, da das Erstellen eines neuen spark context derzeit teuer ist.

8
Jorge Leitão

Ich verwende pytest, mit dem Sie Test-Fixtures erstellen können, um einen Pyspark-Kontext zu instanziieren und in all Ihre Tests zu integrieren, die dies erfordern. Etwas in der Art von

@pytest.fixture(scope="session",
                params=[pytest.mark.spark_local('local'),
                        pytest.mark.spark_yarn('yarn')])
def spark_context(request):
    if request.param == 'local':
        conf = (SparkConf()
                .setMaster("local[2]")
                .setAppName("pytest-pyspark-local-testing")
                )
    Elif request.param == 'yarn':
        conf = (SparkConf()
                .setMaster("yarn-client")
                .setAppName("pytest-pyspark-yarn-testing")
                .set("spark.executor.memory", "1g")
                .set("spark.executor.instances", 2)
                )
    request.addfinalizer(lambda: sc.stop())

    sc = SparkContext(conf=conf)
    return sc

def my_test_that_requires_sc(spark_context):
    assert spark_context.textFile('/path/to/a/file').count() == 10

Anschließend können Sie die Tests im lokalen Modus ausführen, indem Sie py.test -m spark_local Oder in YARN mit py.test -m spark_yarn Aufrufen. Das hat bei mir ganz gut geklappt.

8
santon

Vor einiger Zeit war ich auch mit dem gleichen Problem konfrontiert und nach dem Lesen mehrerer Artikel, Foren und einiger StackOverflow-Antworten habe ich ein kleines Plugin für pytest geschrieben: pytest-spark

Ich benutze es bereits seit einigen Monaten und der allgemeine Workflow sieht unter Linux gut aus:

  1. Installiere Apache Spark (setze JVM + entpacke Spark's Distribution in ein Verzeichnis)
  2. Installiere "pytest" + Plugin "pytest-spark"
  3. Erstellen Sie "pytest.ini" in Ihrem Projektverzeichnis und geben Sie dort den Speicherort Spark an.
  4. Führen Sie Ihre Tests wie gewohnt mit pytest durch.
  5. Optional können Sie das Fixture "spark_context" in Ihren Tests verwenden, das vom Plugin bereitgestellt wird - es versucht, die Spark-Protokolle in der Ausgabe zu minimieren.
1
Alex Markov