SpringBoot 整合RabbitMq

消息生产rabbitmq-provider

pom.xml添加

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

application.yml 添加

spring:
  #给项目来个名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服务器
  rabbitmq:
    host: 192.168.3.137
    port: 5672
    username: root
    password: root
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /
    #消息确认配置项
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true

Service代码

@Slf4j
@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("**ConfirmCallback:     "+"相关数据:"+correlationData);
                log.info("**ConfirmCallback:     "+"确认情况:"+ack);
                log.info("**ConfirmCallback:     "+"原因:"+cause);
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage message) {
                log.info("^^ReturnCallback:     "+"消息:"+message);
            }
        });


        return rabbitTemplate;
    }
}
/**
 * 直连型交换机 - 生产者
 */
@Slf4j
@Configuration
public class AlertDirectRabbitConfig {
    //队列 起名:TestDirectQueue
    @Bean
    public Queue AlertDirectQueue() {
        log.info("Alert直连型队列AlertDirectQueue");
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("AlertDirectQueue",true,true,false);

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("AlertDirectQueue",true);
    }

    //Direct交换机 起名:AlertDirectExchange
    @Bean
    DirectExchange AlertDirectExchange() {
        log.info("Alert直连型交换机AlertDirectExchange");
        //  return new DirectExchange("AlertDirectExchange",true,true);
        return new DirectExchange("AlertDirectExchange",true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:AlertDirectRouting
    @Bean
    Binding bindingDirect() {
        log.info("Alert直连型队列绑定到上交换机,主键为AlertDirectRouting");
        return BindingBuilder.bind(AlertDirectQueue()).to(AlertDirectExchange()).with("AlertDirectRouting");
    }

}

Controller 代码

@RestController
@Slf4j
public class SendAlertController {
    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendAlertMessage")
    public String sendAlertMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "alert message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //将消息携带绑定键值:AlertDirectRouting 发送到交换机AlertDirectExchange
        rabbitTemplate.convertAndSend("AlertDirectExchange", "AlertDirectRouting", map);
        return "ok";
    }

}

 

消费消息rabbitmq-consumer

pom.xml添加

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

application.yml 添加

spring:
  #给项目来个名字
  application:
    name: rabbitmq-consumer
  #配置rabbitMq 服务器
  rabbitmq:
    host: 192.168.3.137
    port: 5672
    username: root
    password: root
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /
    #消息确认配置项
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true

Service代码

@Configuration
public class MessageListenerConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private AlertAckReceiver alertAckReceiver;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //设置一个队列
        container.setQueueNames("AlertDirectQueue");
        //如果同时设置多个如下: 前提是队列都是必须已经创建存在的
//        container.setQueueNames("TestDirectQueue","fanout.A");


        //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
        //container.setQueues(new Queue("TestDirectQueue",true));
        //container.addQueues(new Queue("TestDirectQueue2",true));
        //container.addQueues(new Queue("TestDirectQueue3",true));

        container.setMessageListener(alertAckReceiver);
        return container;
    }
}
@Slf4j
@Component
public class AlertAckReceiver implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理
            String msg = message.toString();
            //可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
            String[] msgArray = msg.split("'");

            Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(),3);

            String messageId=msgMap.get("messageId");
            String messageData=msgMap.get("messageData");
            String createTime=msgMap.get("createTime");
            log.info("  AlertAckReceiver  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);
            log.info("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
            //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
            channel.basicAck(deliveryTag, true);
            //第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
            //			channel.basicReject(deliveryTag, true);


            if ("AlertDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){
                log.info("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
                log.info("消息成功消费到  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);
                log.info("执行AlertDirectQueue中的消息的业务处理流程......");

            }

            if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){
                log.info("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
                log.info("消息成功消费到  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);
                log.info("执行fanout.A中的消息的业务处理流程......");

            }
            log.info("..................................................................");

        } catch (Exception e) {
            channel.basicReject(deliveryTag, false);
            e.printStackTrace();
        }
    }

    private Map<String, String> mapStringToMap(String str, int entryNum) {
        log.info(str);
        str = str.substring(1, str.length() - 1);
        String[] strs = str.split(",",entryNum);
        Map<String, String> map = new HashMap<String, String>();
        for (String string : strs) {
            String key = string.split("=")[0].trim();
            String value = string.split("=")[1];
            map.put(key, value);
        }
        return map;
    }
}

 

滚动至顶部