wake-up-neo.com

Gibt es eine Möglichkeit, vor einem Lauf alle Daten eines Themas zu löschen oder das Thema zu löschen?

Gibt es eine Möglichkeit, vor einem Lauf alle Daten eines Themas zu löschen oder das Thema zu löschen?

Kann ich die KafkaConfig.scala-Datei ändern, um die logRetentionHours-Eigenschaft zu ändern? Gibt es eine Möglichkeit, die Nachrichten zu löschen, sobald der Verbraucher sie liest?

Ich verwende Produzenten, um die Daten von irgendwoher abzurufen und die Daten an ein bestimmtes Thema zu senden, in dem ein Verbraucher konsumiert. Kann ich bei jedem Durchlauf alle Daten aus diesem Thema löschen? Ich möchte jedes Mal nur neue Daten in dem Thema. Gibt es eine Möglichkeit, das Thema irgendwie neu zu initialisieren?

63
TommyT

Denke nicht, dass es noch unterstützt wird. Werfen Sie einen Blick auf diese JIRA-Ausgabe "Hinzufügen von Hilfethemen zum Löschen".

Manuell löschen:

  1. Fahren Sie den Cluster herunter
  2. Bereinigen Sie das Kafka-Protokollverzeichnis (angegeben durch das log.dir-Attribut in kafka config file) sowie die Zookeeper-Daten
  3. Starten Sie den Cluster erneut

Für jedes gegebene Thema können Sie Folgendes tun

  1. Stoppen Sie Kafka
  2. Bereinigen Sie das kafka-spezifische Protokoll für die Partition. Kafka speichert seine Protokolldatei in einem Format "logDir/topic-partition". Für ein Thema mit dem Namen "MyTopic" wird das Protokoll für Partitions-ID 0 in /tmp/kafka-logs/MyTopic-0 gespeichert, wobei /tmp/kafka-logs durch das log.dir-Attribut angegeben wird
  3. Starten Sie Kafka neu

Dies ist NOT ein guter und empfohlener Ansatz, sollte aber funktionieren. In der Kafka-Broker-Konfigurationsdatei wird das log.retention.hours.per.topic-Attribut verwendet, um The number of hours to keep a log file before deleting it for some specific topic zu definieren.

Gibt es auch eine Möglichkeit, die Nachrichten zu löschen, sobald der Verbraucher sie liest?

Aus der Kafka-Dokumentation :

Der Kafka-Cluster behält alle veröffentlichten Nachrichten für einen konfigurierbaren Zeitraum bei, unabhängig davon, ob sie verbraucht wurden oder nicht. Wenn beispielsweise die Aufbewahrungszeit des Protokolls auf zwei Tage festgelegt ist, steht es für die zwei Tage nach der Veröffentlichung einer Nachricht für den Verbrauch zur Verfügung. Danach wird es verworfen, um Speicherplatz freizugeben. Die Leistung von Kafka ist in Bezug auf die Datengröße praktisch konstant, so dass die Aufbewahrung vieler Daten kein Problem ist.

Tatsächlich sind die einzigen Metadaten, die pro Verbraucher gespeichert werden, die Position des Verbrauchers im Protokoll, die als "Offset" bezeichnet wird. Dieser Versatz wird vom Konsumenten gesteuert: Normalerweise wird der Versatz beim Lesen der Nachrichten linear verschoben, die Position wird jedoch vom Konsumenten gesteuert und kann Nachrichten in beliebiger Reihenfolge verbrauchen. Zum Beispiel kann ein Verbraucher auf einen älteren Offset zurückgesetzt werden, um ihn erneut zu verarbeiten.

Um das Start-Offset zu finden, lesen Sie in Kafka 0.8 Simple Consumer Beispiel sagen sie

Kafka enthält zwei Konstanten, um zu helfen: kafka.api.OffsetRequest.EarliestTime() findet den Anfang der Daten in den Protokollen und beginnt mit dem Streaming von dort. kafka.api.OffsetRequest.LatestTime() wird nur neue Nachrichten streamen.

Dort finden Sie auch den Beispielcode zum Verwalten des Offsets auf der Verbraucherseite.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}
45
Hild

Wie ich hier erwähnte Purge Kafka Queue :

In Kafka 0.8.2 für das Schnellstart-Beispiel getestet: Fügen Sie zunächst eine Zeile zur Datei server.properties im Ordner config hinzu:

delete.topic.enable=true

dann können Sie diesen Befehl ausführen:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
42
Patrick

Getestet mit kafka 0.10

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

Hinweis: Wenn Sie Themenordner innerhalb von kafka-logs löschen, nicht jedoch aus dem zookeeper-data-Ordner, sehen Sie, dass Themen immer noch vorhanden sind.

13
Swadeshi

Nachfolgend finden Sie Skripts zum Leeren und Löschen eines Kafka-Themas, wobei localhost als zookeeper-Server und Kafka_Home auf das Installationsverzeichnis gesetzt ist:

Das folgende Skript wird empty ein Thema, indem es die Aufbewahrungszeit auf 1 Sekunde setzt und die Konfiguration dann entfernt:

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

Um vollständig löschen topics müssen Sie alle zutreffenden Kafka-Broker stoppen und die Verzeichnisse aus dem Kafka-Protokollverzeichnis entfernen (Standard:/tmp/kafka-logs). Führen Sie dann dieses Skript aus, um das Thema zu entfernen Zoowärter Um zu bestätigen, dass es aus zookeeper gelöscht wurde, sollte die Ausgabe von ls/brokers/topics das Thema nicht mehr enthalten: 

#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-Shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF
6
vdlen

Wir haben ziemlich viel ausprobiert, was die anderen Antworten mit mäßigem Erfolg beschreiben. Was für uns wirklich funktionierte (Apache Kafka 0.8.1), ist der Klassenbefehl

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost: 2181

5
Dan M

Als unkorrekte Problemumgehung können Sie die Einstellungen für die Laufzeitbeibehaltung pro Thema anpassen, z. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1 ( retention.bytes = 0 könnte auch funktionieren)

Nach kurzer Zeit sollte Kafka den Platz freigeben. Nicht sicher, ob dies Auswirkungen hat, wenn man das Thema neu erstellt. 

ps. Besser bringen Sie die Retentionseinstellungen zurück, sobald die Reinigung abgeschlossen ist.

Sie können auch retention.ms verwenden, um historische Daten zu erhalten

5
Ivan Balashov

Alle Daten zu Themen und ihren Partitionen werden in tmp/kafka-logs/ gespeichert. Außerdem werden sie in einem Format topic-partionNumber gespeichert. Wenn Sie ein Thema newTopic löschen möchten, können Sie:

  • stop Kafka
  • lösche die Dateien rm -rf /tmp/kafka-logs/newTopic-*
2
Salvador Dali

Für brauende Benutzer

Wenn Sie brew wie mich verwenden und viel Zeit damit verbringen, nach dem berüchtigten kafka-logs-Ordner zu suchen, müssen Sie keine Angst mehr haben. (und bitte lassen Sie mich wissen, ob dies für Sie und mehrere verschiedene Versionen von Homebrew, Kafka usw. funktioniert :))

Sie finden es wahrscheinlich unter:

Ort:

/usr/local/var/lib/kafka-logs


Wie finde ich diesen Weg?

(Dies ist auch für praktisch jede App hilfreich, die Sie durch Brühen installieren.)

1) brew services list

kafka startete matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist

2) Öffnen Sie und lesen Sie die oben gefundene plist

3) Finden Sie die Zeile, die server.properties Position definiert, und öffnen Sie sie. In meinem Fall:

  • /usr/local/etc/kafka/server.properties

4) Suchen Sie nach der log.dirs-Zeile:

log.dirs =/usr/local/var/lib/kafka-logs

5) Gehen Sie zu diesem Speicherort und löschen Sie die Protokolle für die gewünschten Themen

6) Kafka mit brew services restart kafka neu starten

1
Matheus Felipe
  1. Stoppen Sie ZooKeeper und Kafka 
  2. Ändern Sie in server.properties den Wert für log.retention.hours. Sie können log.retention.hours kommentieren und log.retention.ms=1000 hinzufügen. Es würde nur eine Sekunde lang das Kafka-Thema aufzeichnen. 
  3. Zookeeper und Kafka starten. 
  4. Überprüfen Sie die Verbraucherkonsole. Als ich die Konsole zum ersten Mal öffnete, war der Rekord da. Als ich die Konsole jedoch wieder öffnete, wurde der Datensatz entfernt. 
  5. Später können Sie den Wert von log.retention.hours auf die gewünschte Zahl setzen.
1
earl

Ich benutze dieses Skript:

#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done

Beim manuellen Löschen eines Themas aus einem Kafka-Cluster könnten Sie einfach https://github.com/darrenfu/bigdata/issues/6 .__ auschecken. Ein wichtiger Schritt, der in der meisten Lösung oft übersehen wurde, ist das Löschen der /config/topics/<topic_name> in ZK. 

0