Laut der Hadoop - The Definitive Guide
Die von FileInputFormats definierten logischen Datensätze passen normalerweise nicht gut in HDFS-Blöcke. Beispielsweise sind die logischen Datensätze eines TextInputFormats Zeilen, die HDFS-Grenzen häufiger überschreiten als nicht. Dies hat keine Auswirkung auf die Funktionsweise Ihres Programms - z. B. werden Zeilen nicht übersehen oder unterbrochen -, aber es lohnt sich zu wissen, da dies bedeutet, dass datenlokale Karten (dh Karten, die auf demselben Host wie ihre ausgeführt werden) vorhanden sind Eingabedaten) führt einige Fernlesevorgänge durch. Der dadurch verursachte geringe Aufwand ist normalerweise nicht signifikant.
Angenommen, eine Datensatzzeile ist auf zwei Blöcke (b1 und b2) aufgeteilt. Der Mapper, der den ersten Block (b1) verarbeitet, wird feststellen, dass die letzte Zeile kein EOL-Trennzeichen hat und holt den Rest der Zeile vom nächsten Datenblock (b2).
Wie stellt der Mapper, der den zweiten Block (b2) verarbeitet, fest, dass der erste Datensatz unvollständig ist und ab dem zweiten Datensatz im Block (b2) verarbeitet werden soll?
Interessante Frage, ich habe einige Zeit damit verbracht, den Code nach Details zu durchsuchen, und hier sind meine Gedanken. Die Aufteilungen werden vom Client mit InputFormat.getSplits
Behandelt, daher gibt ein Blick auf FileInputFormat die folgenden Informationen:
max(minSize, min(maxSize, blockSize))
wobei maxSize
mapred.max.split.size
Entspricht und minSize
mapred.min.split.size
.Teilen Sie die Datei auf der Grundlage der oben berechneten Teilungsgröße in verschiedene FileSplit
s auf. Wichtig ist hierbei, dass jeder FileSplit
mit einem start
-Parameter initialisiert wird, der dem Offset in der Eingabedatei entspricht. Zu diesem Zeitpunkt gibt es noch keine Bearbeitung der Leitungen. Der relevante Teil des Codes sieht folgendermaßen aus:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
Wenn Sie sich danach das LineRecordReader
ansehen, das durch das TextInputFormat
definiert ist, werden die Zeilen dort behandelt:
LineRecordReader
initialisieren, wird versucht, ein LineReader
zu instanziieren, bei dem es sich um eine Abstraktion handelt, um Zeilen über FSDataInputStream
lesen zu können. Es gibt 2 Fälle:CompressionCodec
definiert ist, ist dieser Codec für den Umgang mit Grenzen verantwortlich. Wahrscheinlich nicht relevant für Ihre Frage.Wenn es jedoch keinen Codec gibt, sind die Dinge interessant: Wenn das start
Ihres InputSplit
nicht 0 ist, dann verfolgen Sie 1 Zeichen zurück und überspringen Sie dann die erste Zeile Begegnung identifiziert durch\n oder\r\n (Windows)! Die Rückverfolgung ist wichtig, da Sie dann, wenn Ihre Zeilengrenzen mit geteilten Grenzen übereinstimmen, die gültige Zeile nicht überspringen. Hier ist der relevante Code:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
Da die Teilungen im Client berechnet werden, müssen die Mapper nicht nacheinander ausgeführt werden. Jeder Mapper weiß also bereits, ob die erste Zeile verworfen werden muss oder nicht.
Wenn Sie also 2 Zeilen mit jeweils 100 MB in derselben Datei haben, und zur Vereinfachung sagen wir, dass die Aufteilungsgröße 64 MB beträgt. Wenn dann die Eingabesplits berechnet werden, haben wir das folgende Szenario:
Der Map Reduce Algorithmus funktioniert nicht bei physischen Blöcken der Datei. Es funktioniert mit logischen Eingabesplits. Die Aufteilung der Eingabe hängt davon ab, wo der Datensatz geschrieben wurde. Ein Datensatz kann zwei Mapper umfassen.
Bei der Einrichtung von [~ # ~] hdfs [~ # ~] werden sehr große Dateien in große Blöcke unterteilt (z. B. Messen) 128 MB) und speichert drei Kopien dieser Blöcke auf verschiedenen Knoten im Cluster.
HDFS ist sich des Inhalts dieser Dateien nicht bewusst. Möglicherweise wurde ein Datensatz in Block-a gestartet, aber das Ende dieses Datensatzes ist möglicherweise in Block-b.
Um dieses Problem zu lösen, verwendet Hadoop eine logische Darstellung der in Dateiblöcken gespeicherten Daten, die als Eingabesplits bezeichnet werden. Wenn ein MapReduce-Job-Client die Eingaben aufteilt , ermittelt er, wo sich der erste vollständige Datensatz befindet ein Block beginnt und wo der letzte Datensatz im Block endet.
Der entscheidende Punkt:
In Fällen, in denen der letzte Datensatz in einem Block unvollständig ist, enthält die Eingabeaufteilung Positionsinformationen für den nächsten Block und den Byte-Versatz der Daten, die zum Vervollständigen des Datensatzes erforderlich sind.
Schauen Sie sich das folgende Diagramm an.
Schauen Sie sich diese Artikel und verwandte SE-Frage an: Über das Aufteilen von Hadoop/HDFS-Dateien
Weitere Details finden Sie unter Dokumentation
Das Map-Reduce-Framework verwendet das InputFormat des Jobs, um:
InputSplit[] getSplits(JobConf job,int numSplits
) ist die API, die sich um diese Dinge kümmert.FileInputFormat , das die InputFormat
implementierte getSplits
() -Methode erweitert. Schauen Sie sich die Interna dieser Methode unter grepcode an
Ich sehe es so: InputFormat ist dafür verantwortlich, Daten unter Berücksichtigung der Art der Daten in logische Teilungen aufzuteilen.
Nichts hindert es daran, dies zu tun, obwohl dies zu einer erheblichen Verzögerung des Jobs führen kann - die gesamte Logik und das Lesen um die gewünschten Teilgrößengrenzen werden im Jobtracker ausgeführt.
Das einfachste Eingabeformat für Datensätze ist TextInputFormat. Es funktioniert wie folgt (soweit ich das aus dem Code verstehe) - Eingabeformat erstellt Teilungen nach Größe, unabhängig von den Zeilen, aber LineRecordReader immer:
a) Überspringen Sie die erste Zeile in der Teilung (oder einem Teil davon), wenn es sich nicht um die erste Teilung handelt
b) Lesen Sie am Ende eine Zeile nach der Grenze der Teilung (falls Daten verfügbar sind, handelt es sich also nicht um die letzte Teilung).
Nach meinem Verständnis wird der Standardkonstruktor aufgerufen, wenn FileSplit
für den ersten Block initialisiert wird. Daher sind die Werte für Start und Länge anfangs Null. Wenn am Ende der Verarbeitung des ersten Blocks die letzte Zeile unvollständig ist, ist der Längenwert größer als die Länge der Teilung und es wird auch die erste Zeile des nächsten Blocks gelesen. Aus diesem Grund ist der Startwert für den ersten Block größer als Null, und unter dieser Bedingung überspringt LineRecordReader
die erste Zeile des zweiten Blocks. (Siehe Quelle )
Wenn die letzte Zeile des ersten Blocks vollständig ist, ist der Längenwert gleich der Länge des ersten Blocks und der Wert des Starts für den zweiten Block ist Null. In diesem Fall überspringt LineRecordReader
nicht die erste Zeile und liest den zweiten Block vom Anfang an.
Macht Sinn?
Aus dem Hadoop-Quellcode von LineRecordReader.Java der Konstruktor: Ich finde einige Kommentare:
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
ich glaube, dass Hadoop eine zusätzliche Zeile für jeden Split lesen wird (am Ende des aktuellen Split wird die nächste Zeile im nächsten Split gelesen), und wenn dies nicht der Fall ist, wird die erste Zeile weggeworfen. damit kein Zeilendatensatz verloren geht und unvollständig wird
Die Mapper müssen nicht kommunizieren. Die Dateiblöcke befinden sich in HDFS und der aktuelle Mapper (RecordReader) kann den Block lesen, der den verbleibenden Teil der Zeile enthält. Dies geschieht hinter den Kulissen.