今天小编给大家分享一下springkafka@KafkaListener如何使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。您不能通过这种方式指定group.id和client.id属性。他们将被忽略;可以使用#{…}或属性占位符(${…})在SpEL上配置注释上的大多数属性。比如:属性concurrency
将会从容器中获取listen.concurrency
的值,如果不存在就默认用3①. 消费者线程命名规则填写:2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1
,5,main]-groupId:BASE-DEMO consumer-id5 消费没有填写ID:2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
,5,main] consumer-id7②.在相同容器中的监听器ID不能重复否则会报错Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id③.会覆盖消费者工厂的消费组GroupId假如配置文件属性配置了消费组kafka.consumer.group-id=BASE-DEMO
正常情况它是该容器中的默认消费组
但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么当前消费者的消费组就是consumer-id7
;当然如果你不想要他作为groupId的话 可以设置属性idIsGroup = false
;那么还是会使用默认的GroupId;④. 如果配置了属性groupId,则其优先级最高@KafkaListener(id = “consumer-id5”,idIsGroup = false,topics = “SHI_TOPIC3”,groupId = “groupId-test”)例如上面代码中最终这个消费者的消费组GroupId
是 “groupId-test”该id属性(如果存在)将用作Kafka消费者group.id属性,并覆盖消费者工厂中的已配置属性(如果存在)您还可以groupId显式设置或将其设置idIsGroup为false,以恢复使用使用者工厂的先前行为group.id。指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id如何获取消费者 group.id在监听器中调用KafkaUtils.getConsumerGroupId()
可以获得当前的groupId; 可以在日志中打印出来; 可以知道是哪个客户端消费的;topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)可以同时监听多个topics = {"SHI_TOPIC3","SHI_TOPIC4"}
topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一) topicPartitions 显式分区分配可以为监听器配置明确的主题和分区(以及可选的初始偏移量)上面例子意思是 监听topic1
的0,1免费云主机域名分区;监听topic2
的第0分区,并且第1分区从offset为100的开始消费;实现KafkaListenerErrorHandler
; 然后做一些异常处理;调用的时候 填写beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"
指定生成监听器的工厂类;例如我写一个 批量消费的工厂类使用containerFactory = "batchFactory"
clientIdPrefix 客户端前缀会覆盖消费者工厂的kafka.consumer.client-id
属性; 最为前缀后面接 -n
n是数字concurrency并发数会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端来分配消费分区;分布式情况 总线程数=concurrency*机器数量; 并不是设置越多越好,具体如何设置请看Java concurrency之集合虽然使用的工厂是concurrencyFactory
(concurrency配置了6); 但是他最终生成的监听器数量 是1;kafka中的属性看org.apache.kafka.clients.consumer.ConsumerConfig
;
同名的都可以修改掉;用法KafkaListenerEndpointRegistry当您将Spring Boot与验证启动器一起使用时,将LocalValidatorFactoryBean自动配置:如下使用spring-kafka官方文档官方文档: https://docs.spring.io/spring-kafka/reference/html/@KafkaListener
The@KafkaListener
annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in aMessagingMessageListenerAdapter
configured with various features, such as converters to convert the data, if necessary, to match the method parameters.If, say, sixTopicPartition
instances are provided and theconcurrency
is3
; each container gets two partitions. For fiveTopicPartition
instances, two containers get two partitions, and the third gets one. If theconcurrency
is greater than the number ofTopicPartitions
, theconcurrency
is adjusted down such that each container gets one partition.You can now configure aKafkaListenerErrorHandler
to handle exceptions. SeeHandling Exceptionsfor more information.By default, the@KafkaListener
id
property is now used as thegroup.id
property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure thegroupId
on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use differentgroup.id
values for listeners. To restore the previous behavior of using the factory configuredgroup.id
, set theidIsGroup
property on the annotation tofalse
.示例: demo类:以上就是“springkafka@KafkaListener如何使用”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注百云主机行业资讯频道。
相关推荐: Spring Boot小型项目怎么使用异步任务管理器实现不同业务间的解耦
本篇内容介绍了“SpringBoot小型项目怎么使用异步任务管理器实现不同业务间的解耦”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!顾名思义,就是用来对异步任务…
免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。