其实,Kafka 不需要用户手动删除消息。它本身提供了留存策略,能够自动删除过期消息。 当 然,它是支持手动删除消息的。因此,你最好从这两个维度去回答。

对于设置了 Key 且参数 cleanup.policy=compact 的主题而言,我们可以构造一条 <Key, null> 的消息发送给 Broker,依靠 Log Cleaner 组件提供的功能删除掉该 Key 的消息。 对于普通主题而言,我们可以使用 kafka-delete-records 命令,或编写程序调用 Admin.deleteRecords 方法来删除消息。这两种方法殊途同归,底层都是调用 Admin 的 deleteRecords 方法,通过将分区 Log Start Offset 值抬高的方式间接删除消息。