设置参数:
spring.cloud.stream.rabbit.bindings.<channelName>.consumer.prefetch=1001
spring.cloud.stream.rabbit.default.prefetch=1002 #这个不生效
RabbitMQ Consumer Properties :: Spring Cloud Stream
注意:prefetch生效的前提是acknowledge-mode: 不能为none,推荐设置acknowledge-mode: auto。
baseQos是一个方法,可以设置PrefetchCount的值。
RabbitMQ中的QoS(Quality of Service,服务质量)机制允许消费者控制从队列中接收消息的速率,特别是通过basic.qos
方法和prefetch count
参数来实现。
baseQos方法
basic.qos
是AMQP协议中定义的方法,用于设置消费者端的服务质量参数。在RabbitMQ中,它主要用于:
- 限制消费者未确认消息的最大数量(prefetch count)
- 设置预取消息的字节大小(prefetch size)
- 设置是否应用于整个通道(global)
PrefetchCount参数详解
prefetchCount
是QoS机制中最常用的参数,它决定了:
- 消费者可以同时处理的最大未确认消息数量
- 一旦达到这个数量,RabbitMQ将停止向该消费者推送新消息
- 只有当消费者确认(ACK)了部分消息后,才会继续接收新消息
使用场景
- 负载均衡:在多个消费者之间公平分配消息
- 流量控制:防止消费者过载
- 内存管理:限制消费者端未处理消息的内存占用
设置建议
- 低值(如1):严格轮询分发,确保绝对公平,但可能降低吞吐量
- 中等值(如10-100):平衡公平性和吞吐量
- 高值(如0,表示无限制):最大吞吐量,但可能导致负载不均
注意事项
prefetchCount=0
表示不限制,可能导致某个消费者获取所有消息- 设置过小的prefetch count可能降低系统吞吐量
- 设置过大的prefetch count可能导致消费者内存问题
- 在集群环境中,需要考虑网络延迟对QoS的影响
一、遇到问题
Consumer InternalConsumer{queue='xx.xx', consumerTag='amq.ctag-ILknYRUcG5n2Oa2NNPS3eQ'} (amq.ctag-ILknYRUcG5n2Oa2NNPS3eQ) method handleDelivery for channel AMQChannel(amqp://xx@172.20.0.56:6560/,6) threw an exception for channel AMQChannel(amqp://xx@172.20.0.56:6560/,6) java.lang.OutOfMemoryError: Java heap space at com.rabbitmq.client.impl.CommandAssembler.coalesceContentBody(CommandAssembler.java:142) ~[amqp-client-5.19.0.jar!/:5.19.0] at com.rabbitmq.client.impl.CommandAssembler.getContentBody(CommandAssembler.java:154) ~[amqp-client-5.19.0.jar!/:5.19.0] at com.rabbitmq.client.impl.AMQCommand.getContentBody(AMQCommand.java:104) ~[amqp-client-5.19.0.jar!/:5.19.0] at com.rabbitmq.client.impl.ChannelN.processDelivery(ChannelN.java:471) ~[amqp-client-5.19.0.jar!/:5.19.0] at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:357) ~[amqp-client-5.19.0.jar!/:5.19.0] at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:193) ~[amqp-client-5.19.0.jar!/:5.19.0] at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:125) ~[amqp-client-5.19.0.jar!/:5.19.0] at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:761) ~[amqp-client-5.19.0.jar!/:5.19.0] at com.rabbitmq.client.impl.AMQConnection.access$400(AMQConnection.java:48) ~[amqp-client-5.19.0.jar!/:5.19.0] at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:688) ~[amqp-client-5.19.0.jar!/:5.19.0] at java.base/java.lang.Thread.runWith(Thread.java:1596) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] 2025-04-15T10:14:53.663+08:00 ERROR 1 --- [iios-data-dispatch-service] [72.20.0.56:6560] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=Closed due to exception from Consumer (amq.ctag-ILknYRUcG5n2Oa2NNPS3eQ) method handleDelivery for channel AMQChannel(amqp://xx@172.20.0.56:6560/,6), class-id=0, method-id=0)
二、问题解决
spring:
cloud:
stream:
rabbit:
bindings:
default:
consumer:
prefetch: 100 # 推荐生产环境初始值