RabbitMQ队列控制

队列控制功能列表

  • 延迟队列
  • 删除队列
  • 优先队列
  • 重入队列
  • 切换队列
  • 定时队列

优先队列

  • queue参数配置上面绑定x-max-priority参数
  • consumer接收时也需要绑定x-max-priority参数
  • message上面绑定priority值,若超出maxPriority则优先级按0进行处理
  • 若queue一直被监听,此时好像优先级设定不起作用;即consumer模式不行,可以使用get模式

延迟队列

  • RabbitMQ 3.5.3版本官方出了一个延时加载的插件 rabbitmq-delayed-message-exchange 可以更方便解决这样的问题
  • 也可以使用原生的ttl和dlx的方式进行处理,本文是以ttl+dlx的方式进行处理

SpringBOOT 延时队列 Go实现延时队列

rabbitmq-dlx

  • delay_exchange与delay_queue进行bind
  • dlx exchange与dlx_queue进行bind
  • delay_queue里面设置message ttl为10s,并将x-dead-letter-exchange为dlx
  • 若delay_queue消息进行了以下的问题,则会被放至dlx当中
    • The message is rejected (basic.reject or basic.nack) with requeue=false,
    • The TTL for the message expires;
    • The queue length limit is exceeded.

设置delayQueue,dlxQueue,delayExchange,dlx

  	@Bean
    Queue delayQueue() {
        Map<String, Object> param = new HashMap<>(2);
        param.put("x-dead-letter-exchange", RabbitConstant.DEAD_LETTER_EXCHANGE);
        // message ttl time 单位ms
        param.put("x-message-ttl", 10000);

        return new Queue(RabbitConstant.DELAY_QUEUE, false, false, false, param);
    }

    @Bean
    Queue dlxQueue() {
        return new Queue(RabbitConstant.DLX_QUEUE);
    }


    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(RabbitConstant.DEAD_LETTER_EXCHANGE);
    }

    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(RabbitConstant.DELAY_EXCHANGE);
    }

    @Bean
    public Binding delayBind(Queue delayQueue, DirectExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("");
    }

    @Bean
    public Binding dlxBind(Queue dlxQueue, DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(dlxQueue).to(deadLetterExchange).with("");
    }

sender && receive

	// sender
	public void ttlSend(Object object) {
        System.out.println("delayQueue sender---------------- " + object);
        rabbitTemplate.convertAndSend(RabbitConstant.DELAY_EXCHANGE, "", object);
    }

	// receiver
    @RabbitListener(queues = RabbitConstant.DLX_QUEUE)
    public void dlxReceive(Message object) {
        System.out.println("dlxQueue receive---------------- " + new String(object.getBody()));
    }

Test

/**
 * @author admin
 * @version V1.0 31/05/2018 admin Exp $
 * @description
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DlxTest {

    @Autowired
    private DirectSend directSend;

    @Test
    public void ttlDlxTest() {
        directSend.ttlSend("hello ttl && dlx");

        // 由于设置message ttl为10s,所以设置Test线程停留11s,保证dlx queue可以收到消息
        try {
            Thread.sleep(11000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

说明

  • dead_letter_exchange与dlxQeueu没有绑定routeKey,若绑定了routeKey,那么在消息的配置参数【x-dead-letter-routing-key】也应该绑定对应的routeKey,这样才是一条通的消息链路