wake-up-neo.com

Wie verarbeitet Hadoop Datensätze, die über Blockgrenzen verteilt sind?

Laut der Hadoop - The Definitive Guide

Die von FileInputFormats definierten logischen Datensätze passen normalerweise nicht gut in HDFS-Blöcke. Beispielsweise sind die logischen Datensätze eines TextInputFormats Zeilen, die HDFS-Grenzen häufiger überschreiten als nicht. Dies hat keine Auswirkung auf die Funktionsweise Ihres Programms - z. B. werden Zeilen nicht übersehen oder unterbrochen -, aber es lohnt sich zu wissen, da dies bedeutet, dass datenlokale Karten (dh Karten, die auf demselben Host wie ihre ausgeführt werden) vorhanden sind Eingabedaten) führt einige Fernlesevorgänge durch. Der dadurch verursachte geringe Aufwand ist normalerweise nicht signifikant.

Angenommen, eine Datensatzzeile ist auf zwei Blöcke (b1 und b2) aufgeteilt. Der Mapper, der den ersten Block (b1) verarbeitet, wird feststellen, dass die letzte Zeile kein EOL-Trennzeichen hat und holt den Rest der Zeile vom nächsten Datenblock (b2).

Wie stellt der Mapper, der den zweiten Block (b2) verarbeitet, fest, dass der erste Datensatz unvollständig ist und ab dem zweiten Datensatz im Block (b2) verarbeitet werden soll?

116
Praveen Sripati

Interessante Frage, ich habe einige Zeit damit verbracht, den Code nach Details zu durchsuchen, und hier sind meine Gedanken. Die Aufteilungen werden vom Client mit InputFormat.getSplits Behandelt, daher gibt ein Blick auf FileInputFormat die folgenden Informationen:

  • Ermitteln Sie für jede Eingabedatei die Dateilänge, die Blockgröße und berechnen Sie die Teilungsgröße als max(minSize, min(maxSize, blockSize)) wobei maxSizemapred.max.split.size Entspricht und minSizemapred.min.split.size.
  • Teilen Sie die Datei auf der Grundlage der oben berechneten Teilungsgröße in verschiedene FileSplits auf. Wichtig ist hierbei, dass jeder FileSplit mit einem start -Parameter initialisiert wird, der dem Offset in der Eingabedatei entspricht. Zu diesem Zeitpunkt gibt es noch keine Bearbeitung der Leitungen. Der relevante Teil des Codes sieht folgendermaßen aus:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

Wenn Sie sich danach das LineRecordReader ansehen, das durch das TextInputFormat definiert ist, werden die Zeilen dort behandelt:

  • Wenn Sie Ihr LineRecordReader initialisieren, wird versucht, ein LineReader zu instanziieren, bei dem es sich um eine Abstraktion handelt, um Zeilen über FSDataInputStream lesen zu können. Es gibt 2 Fälle:
  • Wenn ein CompressionCodec definiert ist, ist dieser Codec für den Umgang mit Grenzen verantwortlich. Wahrscheinlich nicht relevant für Ihre Frage.
  • Wenn es jedoch keinen Codec gibt, sind die Dinge interessant: Wenn das start Ihres InputSplit nicht 0 ist, dann verfolgen Sie 1 Zeichen zurück und überspringen Sie dann die erste Zeile Begegnung identifiziert durch\n oder\r\n (Windows)! Die Rückverfolgung ist wichtig, da Sie dann, wenn Ihre Zeilengrenzen mit geteilten Grenzen übereinstimmen, die gültige Zeile nicht überspringen. Hier ist der relevante Code:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

Da die Teilungen im Client berechnet werden, müssen die Mapper nicht nacheinander ausgeführt werden. Jeder Mapper weiß also bereits, ob die erste Zeile verworfen werden muss oder nicht.

Wenn Sie also 2 Zeilen mit jeweils 100 MB in derselben Datei haben, und zur Vereinfachung sagen wir, dass die Aufteilungsgröße 64 MB beträgt. Wenn dann die Eingabesplits berechnet werden, haben wir das folgende Szenario:

  • Teilen Sie 1 mit dem Pfad und den Hosts zu diesem Block. Initialisiert beim Start 200-200 = 0 MB, Länge 64 MB.
  • Split 2 wird beim Start initialisiert 200-200 + 64 = 64 MB, Länge 64 MB.
  • Split 3 wird beim Start initialisiert 200-200 + 128 = 128 MB, Länge 64 MB.
  • Split 4 beim Start initialisiert 200-200 + 192 = 192Mb, Länge 8Mb.
  • Mapper A verarbeitet Split 1, Start ist 0, überspringen Sie also nicht die erste Zeile und lesen Sie eine vollständige Zeile, die über die 64-MB-Grenze hinausgeht, sodass ein Remote-Lesen erforderlich ist.
  • Mapper B verarbeitet Split 2, Start ist! = 0, überspringen Sie also die erste Zeile nach 64 MB-1 Byte, was dem Ende von Zeile 1 bei 100 MB entspricht, das sich noch in Split 2 befindet. Wir haben also 28 MB der Zeile in Split 2 remote lesen die restlichen 72Mb.
  • Mapper C verarbeitet Split 3, Start ist! = 0. Überspringen Sie also die erste Zeile nach 128 MB-1 Byte, was dem Ende von Zeile 2 bei 200 MB entspricht. Dies ist das Dateiende. Tun Sie also nichts.
  • Mapper D ist mit Mapper C identisch, außer dass nach 192 MB-1 Byte eine neue Zeile gesucht wird.
155
Charles Menguy

Der Map Reduce Algorithmus funktioniert nicht bei physischen Blöcken der Datei. Es funktioniert mit logischen Eingabesplits. Die Aufteilung der Eingabe hängt davon ab, wo der Datensatz geschrieben wurde. Ein Datensatz kann zwei Mapper umfassen.

Bei der Einrichtung von [~ # ~] hdfs [~ # ~] werden sehr große Dateien in große Blöcke unterteilt (z. B. Messen) 128 MB) und speichert drei Kopien dieser Blöcke auf verschiedenen Knoten im Cluster.

HDFS ist sich des Inhalts dieser Dateien nicht bewusst. Möglicherweise wurde ein Datensatz in Block-a gestartet, aber das Ende dieses Datensatzes ist möglicherweise in Block-b.

Um dieses Problem zu lösen, verwendet Hadoop eine logische Darstellung der in Dateiblöcken gespeicherten Daten, die als Eingabesplits bezeichnet werden. Wenn ein MapReduce-Job-Client die Eingaben aufteilt , ermittelt er, wo sich der erste vollständige Datensatz befindet ein Block beginnt und wo der letzte Datensatz im Block endet.

Der entscheidende Punkt:

In Fällen, in denen der letzte Datensatz in einem Block unvollständig ist, enthält die Eingabeaufteilung Positionsinformationen für den nächsten Block und den Byte-Versatz der Daten, die zum Vervollständigen des Datensatzes erforderlich sind.

Schauen Sie sich das folgende Diagramm an.

enter image description here

Schauen Sie sich diese Artikel und verwandte SE-Frage an: Über das Aufteilen von Hadoop/HDFS-Dateien

Weitere Details finden Sie unter Dokumentation

Das Map-Reduce-Framework verwendet das InputFormat des Jobs, um:

  1. Überprüfen Sie die Eingabespezifikation des Jobs.
  2. Teilen Sie die Eingabedatei (en) in logische InputSplits auf, von denen jede einem einzelnen Mapper zugewiesen wird.
  3. Jeder InputSplit wird dann einem einzelnen Mapper zur Verarbeitung zugewiesen. Split könnte Tuple sein. InputSplit[] getSplits(JobConf job,int numSplits) ist die API, die sich um diese Dinge kümmert.

FileInputFormat , das die InputFormat implementierte getSplits () -Methode erweitert. Schauen Sie sich die Interna dieser Methode unter grepcode an

16
Ravindra babu

Ich sehe es so: InputFormat ist dafür verantwortlich, Daten unter Berücksichtigung der Art der Daten in logische Teilungen aufzuteilen.
Nichts hindert es daran, dies zu tun, obwohl dies zu einer erheblichen Verzögerung des Jobs führen kann - die gesamte Logik und das Lesen um die gewünschten Teilgrößengrenzen werden im Jobtracker ausgeführt.
Das einfachste Eingabeformat für Datensätze ist TextInputFormat. Es funktioniert wie folgt (soweit ich das aus dem Code verstehe) - Eingabeformat erstellt Teilungen nach Größe, unabhängig von den Zeilen, aber LineRecordReader immer:
a) Überspringen Sie die erste Zeile in der Teilung (oder einem Teil davon), wenn es sich nicht um die erste Teilung handelt
b) Lesen Sie am Ende eine Zeile nach der Grenze der Teilung (falls Daten verfügbar sind, handelt es sich also nicht um die letzte Teilung).

7
David Gruzman

Nach meinem Verständnis wird der Standardkonstruktor aufgerufen, wenn FileSplit für den ersten Block initialisiert wird. Daher sind die Werte für Start und Länge anfangs Null. Wenn am Ende der Verarbeitung des ersten Blocks die letzte Zeile unvollständig ist, ist der Längenwert größer als die Länge der Teilung und es wird auch die erste Zeile des nächsten Blocks gelesen. Aus diesem Grund ist der Startwert für den ersten Block größer als Null, und unter dieser Bedingung überspringt LineRecordReader die erste Zeile des zweiten Blocks. (Siehe Quelle )

Wenn die letzte Zeile des ersten Blocks vollständig ist, ist der Längenwert gleich der Länge des ersten Blocks und der Wert des Starts für den zweiten Block ist Null. In diesem Fall überspringt LineRecordReader nicht die erste Zeile und liest den zweiten Block vom Anfang an.

Macht Sinn?

3
aa8y

Aus dem Hadoop-Quellcode von LineRecordReader.Java der Konstruktor: Ich finde einige Kommentare:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

ich glaube, dass Hadoop eine zusätzliche Zeile für jeden Split lesen wird (am Ende des aktuellen Split wird die nächste Zeile im nächsten Split gelesen), und wenn dies nicht der Fall ist, wird die erste Zeile weggeworfen. damit kein Zeilendatensatz verloren geht und unvollständig wird

1
Shenghai.Geng

Die Mapper müssen nicht kommunizieren. Die Dateiblöcke befinden sich in HDFS und der aktuelle Mapper (RecordReader) kann den Block lesen, der den verbleibenden Teil der Zeile enthält. Dies geschieht hinter den Kulissen.

0
user3507308