Ich versuche, einen Komponententest für einen Kafka -Listener zu schreiben, den ich mit Spring Boot 2.x entwickle. Als Komponententest möchte ich keinen vollständigen Kafka Server starten, der eine Instanz von Zookeeper ist. Also habe ich mich für Spring Embedded Kafka entschieden.
Die Definition meines Zuhörers ist sehr einfach.
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
Auch der Test, bei dem überprüft wird, ob der Zähler latch
nach dem Empfang einer Nachricht gleich Null ist, ist sehr einfach.
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> buildKafkaTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
return new KafkaTemplate<>(pf);
}
@Test
public void listenerShouldConsumeMessages() throws InterruptedException {
// Given
producer.sendDefault(1, "Hello world");
// Then
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
}
}
Leider schlägt der Test fehl und ich kann nicht verstehen warum. Ist es möglich, mit einer Instanz von KafkaEmbedded
eine mit der Annotation @KafkaListener
Gekennzeichnete Methode zu testen?
Der gesamte Code wird in meinem GitHub-Repository geteilt kafka-listener .
Dank an alle.
Sie senden die Nachricht wahrscheinlich, bevor dem Verbraucher das Thema/die Partition zugewiesen wurde. Eigenschaft festlegen ...
spring:
kafka:
consumer:
auto-offset-reset: earliest
... ist standardmäßig latest
.
Dies entspricht der Verwendung von --from-beginning
Mit dem Konsolenbenutzer.
EDIT
Oh; Sie verwenden die Eigenschaften von boot nicht.
Hinzufügen
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
EDIT2
Übrigens sollten Sie wahrscheinlich auch eine get(10L, TimeUnit.SECONDS)
für das Ergebnis der template.send()
(a Future<>
) Ausführen, um zu bestätigen, dass der Sendevorgang erfolgreich war.
EDIT
Um das Zurücksetzen des Offsets nur für den Test zu überschreiben, können Sie das Gleiche tun wie für die Brokeradressen:
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);
und
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"})
Beachten Sie jedoch, dass diese Eigenschaft nur beim erstmaligen Verwenden einer Gruppe angewendet wird. Um beim Start der App immer am Ende zu beginnen, müssen Sie beim Start nach dem Ende suchen.
Außerdem würde ich empfehlen, enable.auto.commit
Auf false
zu setzen, damit der Container die Offsets festlegt, anstatt sich nur darauf zu verlassen, dass der Consumer-Client dies nach einem Zeitplan vornimmt.