wake-up-neo.com

Wie bekomme ich Daten vom alten Versatzpunkt in Kafka?

Ich benutze Zoookeeper, um Daten von Kafka zu erhalten. Und hier bekomme ich immer Daten vom letzten Versatzpunkt. Gibt es eine Möglichkeit, den Versatzzeitpunkt anzugeben, um alte Daten zu erhalten?

Es gibt eine Option autooffset.reset. Es akzeptiert kleinste oder größte. Kann jemand bitte erklären, was am kleinsten und größten ist. Kann autooffset.reset beim Abrufen von Daten vom alten Versatzpunkt statt vom letzten Versatzpunkt helfen?

35
Sourabh

Die Verbraucher gehören immer zu einer Gruppe, und der Zookeeper verfolgt für jede Partition den Fortschritt dieser Verbrauchergruppe in der Partition.

Um von Anfang an abzurufen, können Sie alle Daten löschen, die mit dem Fortschritt verknüpft sind, wie von Hussain angegeben

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

Sie können auch den gewünschten Versatz der Partition angeben, wie in core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala angegeben

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)

Der Versatz ist jedoch nicht zeitindiziert, aber Sie wissen für jede Partition eine Sequenz.

Wenn Ihre Nachricht einen Zeitstempel enthält (und beachten Sie, dass dieser Zeitstempel nichts mit dem Moment zu tun hat, zu dem Kafka Ihre Nachricht erhalten hat), können Sie versuchen, einen Indexer auszuführen, der versucht, einen Eintrag schrittweise abzurufen, indem Sie den Versatz um N erhöhen und den Wert speichern Tuple (Thema X, Teil 2, Offset 100, Zeitstempel) irgendwo.

Wenn Sie Einträge von einem bestimmten Zeitpunkt abrufen möchten, können Sie eine binäre Suche auf Ihren groben Index anwenden, bis Sie den gewünschten Eintrag finden und von dort abrufen.

24
Alex Rodrigues

Aus der Kafka Dokumentation sagen sie "Kafka.api.OffsetRequest.EarliestTime () findet den Anfang der Daten in den Protokollen und beginnt von dort aus zu streamen Neue Nachrichten. Gehen Sie nicht davon aus, dass Offset 0 der Anfangsoffset ist, da Nachrichten aus dem Protokoll im Laufe der Zeit altern. "

Verwenden Sie das SimpleConsumerExample hier: https://cwiki.Apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

Ähnliche Frage: Kafka High Level Consumer holt alle Nachrichten mit Java-API von einem Thema ab (Entspricht --from-beginn)

Das könnte helfen

8
Hild

In dem Dokument über kafka config: http://kafka.Apache.org/08/configuration.html finden Sie Informationen zu den kleinsten und größten Werten der Offset-Parameter.

Übrigens, bei der Erkundung von Kafka habe ich mich gefragt, wie man alle Nachrichten für einen Verbraucher wiedergibt. Ich meine, wenn eine Verbrauchergruppe alle Nachrichten abgefragt hat und diese erneut abrufen möchte. 

Dies kann erreicht werden, indem Daten aus dem Zoopeeper gelöscht werden. Verwenden Sie die Klasse kafka.utils.ZkUtils, um einen Knoten im zookeeper zu löschen. Unten ist die Verwendung:

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
3
Hussain Pirosha

Zur Zeit

Kafka FAQ gibt eine Antwort auf dieses Problem.

Wie erhalte ich mit OffsetRequest präzise Versätze von Nachrichten für einen bestimmten Zeitstempel?

Kafka erlaubt das Abfragen von Offsets von Nachrichten nach Zeit und dies bei einer Segmentgranularität. Der Parameter timestamp ist der Unix-Timestamp. Wenn Sie den Offset nach Timestamp abfragen, wird der späteste mögliche Offset der Nachricht zurückgegeben, der nicht später als der angegebene Timestamp angefügt wird. Es gibt zwei spezielle Werte des Zeitstempels - späteste und früheste. Für jeden anderen Wert des Unix-Zeitstempels erhält Kafka den Startoffset des Protokollsegments, das nicht später als der angegebene Zeitstempel erstellt wird. Aufgrund dessen und da die Offset-Anforderung nur mit Segmentgranularität bedient wird, gibt die Offset-Hol-Anforderung weniger genaue Ergebnisse für größere Segmentgrößen zurück.

Um genauere Ergebnisse zu erhalten, können Sie die Größe des Protokollsegments nach Zeit (log.roll.ms) anstatt nach Größe (log.segment.bytes) konfigurieren. Es ist jedoch Vorsicht geboten, da dies die Anzahl der Dateibehandler aufgrund des häufigen Rollens von Protokollsegmenten erhöhen kann.


Zukunftsplan

Kafka fügt dem Nachrichtenformat einen Zeitstempel hinzu. Beziehen auf 

https://cwiki.Apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

2
zheolong

Kafka Protocol Doc ist eine großartige Quelle für Anfragen/Antworten/Offsets/Messages: https://cwiki.Apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol Sie verwenden das Simple Consumer-Beispiel, in dem der folgende Code den Status veranschaulicht:

FetchRequest req = new FetchRequestBuilder()

        .clientId(clientName)

        .addFetch(a_topic, a_partition, readOffset, 100000) 

        .build();

FetchResponse fetchResponse = simpleConsumer.fetch(req);

setOfset auf Startoffset von einstellen. Sie müssen aber auch den maximalen Offset überprüfen, um die Anzahl der Offsets gemäß FetchSize im letzten Parameter der addFetch-Methode zu beschränken.

1
usman

Mit dem KafkaConsumer können Sie Seek, SeekToBeginning und SeekToEnd verwenden, um sich im Stream zu bewegen.

https://kafka.Apache.org/0100/javadoc/org/Apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(Java.util.Collection)

Wenn keine Partition bereitgestellt wird, wird der erste Offset für alle derzeit zugewiesenen Partitionen gesucht.

0
CamW

hast du das probiert 

bin/kafka-console-consumer.sh --bootstrap-server localhost: 9092 - top-test - von-anfang

Es würde alle Meldungen für das gegebene Thema ausgeben, in diesem Beispiel "test". 

Weitere Details über diesen Link https://kafka.Apache.org/quickstart

0
Gang Peng