• 作者:老汪软件技巧
  • 发表时间:2024-11-29 21:03
  • 浏览量:

Topic 是什么

Topic就是主题,相同业务可以放同一个主题,我们可以类比MySQL,一类数据就放在一张表里,而消息队列中,某类数据就可以放入一个主题,这其实可以看作数据分片的一种方式。

比如秒杀消息,就放入秒杀主题,比如短信消息,就放入短信主题,通过主题Kafka实现了业务的隔离,从主题A拿到的消息一定是A对应类型的消息,消费者可以做对应的处理,试想如果各式各样的消息混在一起,处理起来得多乱。

怎么创建Topic

首先进入容器中的/opt/kafka/bin目录。

执行如下命令:

./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic niugetest

执行之后会有如下返回:

如果这时候再执行一遍,会提示主题已存在,这就说明我们之前的主题是已经创建成功了:

怎么查询 Topic

./kafka-topics.sh --list --bootstrap-server localhost:9092

这时候会有如下返回,可以看到能查询到我们之前创建的niugetest这个主题:

我们再用命令创建一个新主题 niugetest2

./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic niugetest2

创建成功后再查询一波

./kafka-topics.sh --list --bootstrap-server localhost:9092

返回中包含了niugetest和niugetest2两个主题,符合预期:

我们还可以通过如下命令,来查看主题的更多信息:

./kafka-topics.sh --describe --bootstrap-server localhost:9092

这里有一些Leader和Replicas的信息,这些集群相关概念我们后面会讲解,这里有个印象即可。

如果只想查看某个主题的信息,可以用如下命令:

./kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic niugetest2

怎么删除Topic

执行如下删除语句,没有异常就是成功:

./kafka-topics.sh --delete --bootstrap-server  localhost:9092 --topic niugetest2

操作之后再查询一波

./kafka-topics.sh --list --bootstrap-server localhost:9092

开门见上,从 Topic 讲起  | 豆包MarsCode AI刷题_开门见上,从 Topic 讲起  | 豆包MarsCode AI刷题_

返回如下,已经没有niugetest2,符合预期

代码中怎么使用 Topic

前面我们已经理解了Topic是什么,有什么作用,在业务开发中使用起来其实非常简单,就非常类似MySQL的分表,只要指定向哪个主题做什么操作即可。

比如我们在应用场景章节演示过的解耦例子,就是向tp-mq-decoupling这个主题,发送一条叫"come on"的信息。

在业务开发时候,也可以不必提前创建主题,只要向某个主题发送消息,他就自动创建了

@PostMapping(value = "/decoupling_with_mq", consumes = "application/json; charset=utf-8")
public ResponseEntity decouplingWithMQ(@RequestBody IncrCountReq data) {
    String msg = "come on";
    kafkaTemplate.send("tp-mq-decoupling",msg);
    return ResponseEntity.ok();
}

消费的时候也是直接指定tp-mq-decoupling这个主题即可,如下面代码,在注解中制定了topics = "tp-mq-decoupling",这段代码就会不断去消费tp-mq-decoupling中的信息:

@KafkaListener(topics = "tp-mq-decoupling", groupId = "TEST_GROUP",concurrency = "1", containerFactory = "kafkaManualAckListenerContainerFactory")
public void topic_test(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    Optional message = Optional.ofNullable(record.value());
    if (message.isPresent()) {
        Object msg = message.get();
        System.out.println("收到Kafka消息! Topic:" + topic + ",Message:" + msg);
        try {
            couontSerivce.incrManyTimes(10000000);
            ack.acknowledge();
            log.info("Kafka消费成功! Topic:" + topic + ",Message:" + msg);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Kafka消费失败!Topic:" + topic + ",Message:" + msg, e);
        }
    }
}

这里只是演示了怎么向主题发消息、消费消息这两个最核心的交互,无论是Java/Go的Kafka SDK都是能支持更多的操作,比如我们上面提到的主题创建、主题查询、主题删除,也都是可以在代码中完成的,不过一般而言是不会在业务服务里直接去做的。

Topic存在哪里

简单描述的话,可以认为topic是存在服务器上的,Kafka把服务器称作Broker,如果是集群环境下,一个topic实际会跨多个Broker。

实际上,topic在Kafka里其实是一个逻辑上的概念,也就是架构和逻辑上有主题这个概念,但是实际存储的时候不是以一个主题来,而是将主题划分为分片来存储,也就是说Kafka会将主题分片跨Broker进行存储。

总结

主题是Kafka中非常核心的概念,Kafka的使用方会直接和主题打交道,比如某条消息一定要指定是发给哪个主题的,下一节我们会介绍Partition,可以理解为主题分片。