wake-up-neo.com

Testen eines @KafkaListener mit Spring Embedded Kafka

Ich versuche, einen Komponententest für einen Kafka -Listener zu schreiben, den ich mit Spring Boot 2.x entwickle. Als Komponententest möchte ich keinen vollständigen Kafka Server starten, der eine Instanz von Zookeeper ist. Also habe ich mich für Spring Embedded Kafka entschieden.

Die Definition meines Zuhörers ist sehr einfach.

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

Auch der Test, bei dem überprüft wird, ob der Zähler latch nach dem Empfang einer Nachricht gleich Null ist, ist sehr einfach.

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

Leider schlägt der Test fehl und ich kann nicht verstehen warum. Ist es möglich, mit einer Instanz von KafkaEmbedded eine mit der Annotation @KafkaListener Gekennzeichnete Methode zu testen?

Der gesamte Code wird in meinem GitHub-Repository geteilt kafka-listener .

Dank an alle.

12
riccardo.cardin

Sie senden die Nachricht wahrscheinlich, bevor dem Verbraucher das Thema/die Partition zugewiesen wurde. Eigenschaft festlegen ...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

... ist standardmäßig latest.

Dies entspricht der Verwendung von --from-beginning Mit dem Konsolenbenutzer.

EDIT

Oh; Sie verwenden die Eigenschaften von boot nicht.

Hinzufügen

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

EDIT2

Übrigens sollten Sie wahrscheinlich auch eine get(10L, TimeUnit.SECONDS) für das Ergebnis der template.send() (a Future<>) Ausführen, um zu bestätigen, dass der Sendevorgang erfolgreich war.

EDIT

Um das Zurücksetzen des Offsets nur für den Test zu überschreiben, können Sie das Gleiche tun wie für die Brokeradressen:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

und

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

Beachten Sie jedoch, dass diese Eigenschaft nur beim erstmaligen Verwenden einer Gruppe angewendet wird. Um beim Start der App immer am Ende zu beginnen, müssen Sie beim Start nach dem Ende suchen.

Außerdem würde ich empfehlen, enable.auto.commit Auf false zu setzen, damit der Container die Offsets festlegt, anstatt sich nur darauf zu verlassen, dass der Consumer-Client dies nach einem Zeitplan vornimmt.

8
Gary Russell