今天小编给大家分享一下springboot+kafka中@KafkaListener动态指定多个topic怎么实现的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的消费注解@KafkaListener首先,application.properties中配置用逗号隔开的多个topic。方法:利用Spring的SpEl表达免费云主机域名式,将topics 配置为:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”)运行程序,console打印的效果如下:因为只开了一条消费者线程,所以所有的topic和分区都分配给这条线程。如果你想开多条消费者线程去消费这些topic,添加@KafkaListener注解的参数concurrency的值为自己想要的消费者个数即可(注意,消费者数要小于等于你开的所有topic的分区数总和)运行程序,console打印的效果如下:如何在程序运行的过程中,改变topic,消费者能够消费修改后的topic?ans: 经过尝试,使用@KafkaListener注解实现不了此需求,在程序启动的时候,程序就会根据@KafkaListener的注解信息初始化好消费者去消费指定好的topic。如果在程序运行的过程中,修改topic,不会让此消费者修改消费者的配置再重新订阅topic的。不过我们可以有个折中的办法,就是利用@KafkaListener的topicPattern参数来进行topic匹配。不使用@KafkaListener,使用kafka原生客户端依赖,手动初始化消费者,开启消费者线程。在消费者线程中,每次循环都从配置、数据库或者其他配置源获取最新的topic信息,与之前的topic比较,如果发生变化,重新订阅topic或者初始化消费者。加入kafka客户端依赖(本次测试服务端kafka版本:2.12-2.4.0)说一下第72行代码:上面这行代码表示:在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回。在修改topic后,必须等到此次poll拉取的消息处理完,while(true)循环的时候检测topic发生变化,才能重新订阅topic.poll()方法一次拉取得消息数默认为:500,如下图,kafka客户端源码中设置的。如果想自定义此配置,可在初始化消费者时加入运行结果(测试的topic中都无数据)注意:KafkaConsumer是线程不安全的,不要用一个KafkaConsumer实例开启多个消费者,要开启多个消费者,需要new 多个KafkaConsumer实例。以上就是“springboot+kafka中@KafkaListener动态指定多个topic怎么实现”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注百云主机行业资讯频道。
这篇“php url参数中文乱码如何解决”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“php url参数中文乱码如何解决”文章吧。 php ur…
免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。