消息生产rabbitmq-provider
pom.xml添加
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
application.yml 添加
spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 192.168.3.137 port: 5672 username: root password: root #虚拟host 可以不设置,使用server默认host virtual-host: / #消息确认配置项 #确认消息已发送到交换机(Exchange) publisher-confirm-type: correlated #确认消息已发送到队列(Queue) publisher-returns: true
Service代码
@Slf4j @Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("**ConfirmCallback: "+"相关数据:"+correlationData); log.info("**ConfirmCallback: "+"确认情况:"+ack); log.info("**ConfirmCallback: "+"原因:"+cause); } }); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage message) { log.info("^^ReturnCallback: "+"消息:"+message); } }); return rabbitTemplate; } }
/** * 直连型交换机 - 生产者 */ @Slf4j @Configuration public class AlertDirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue AlertDirectQueue() { log.info("Alert直连型队列AlertDirectQueue"); // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("AlertDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("AlertDirectQueue",true); } //Direct交换机 起名:AlertDirectExchange @Bean DirectExchange AlertDirectExchange() { log.info("Alert直连型交换机AlertDirectExchange"); // return new DirectExchange("AlertDirectExchange",true,true); return new DirectExchange("AlertDirectExchange",true,false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:AlertDirectRouting @Bean Binding bindingDirect() { log.info("Alert直连型队列绑定到上交换机,主键为AlertDirectRouting"); return BindingBuilder.bind(AlertDirectQueue()).to(AlertDirectExchange()).with("AlertDirectRouting"); } }
Controller 代码
@RestController @Slf4j public class SendAlertController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 @GetMapping("/sendAlertMessage") public String sendAlertMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "alert message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //将消息携带绑定键值:AlertDirectRouting 发送到交换机AlertDirectExchange rabbitTemplate.convertAndSend("AlertDirectExchange", "AlertDirectRouting", map); return "ok"; } }
消费消息rabbitmq-consumer
pom.xml添加
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
application.yml 添加
spring: #给项目来个名字 application: name: rabbitmq-consumer #配置rabbitMq 服务器 rabbitmq: host: 192.168.3.137 port: 5672 username: root password: root #虚拟host 可以不设置,使用server默认host virtual-host: / #消息确认配置项 #确认消息已发送到交换机(Exchange) publisher-confirm-type: correlated #确认消息已发送到队列(Queue) publisher-returns: true
Service代码
@Configuration public class MessageListenerConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private AlertAckReceiver alertAckReceiver; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); // RabbitMQ默认是自动确认,这里改为手动确认消息 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置一个队列 container.setQueueNames("AlertDirectQueue"); //如果同时设置多个如下: 前提是队列都是必须已经创建存在的 // container.setQueueNames("TestDirectQueue","fanout.A"); //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues //container.setQueues(new Queue("TestDirectQueue",true)); //container.addQueues(new Queue("TestDirectQueue2",true)); //container.addQueues(new Queue("TestDirectQueue3",true)); container.setMessageListener(alertAckReceiver); return container; } }
@Slf4j @Component public class AlertAckReceiver implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理 String msg = message.toString(); //可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据 String[] msgArray = msg.split("'"); Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(),3); String messageId=msgMap.get("messageId"); String messageData=msgMap.get("messageData"); String createTime=msgMap.get("createTime"); log.info(" AlertAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime); log.info("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue()); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 channel.basicAck(deliveryTag, true); //第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝 // channel.basicReject(deliveryTag, true); if ("AlertDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){ log.info("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue()); log.info("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime); log.info("执行AlertDirectQueue中的消息的业务处理流程......"); } if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){ log.info("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue()); log.info("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime); log.info("执行fanout.A中的消息的业务处理流程......"); } log.info(".................................................................."); } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } private Map<String, String> mapStringToMap(String str, int entryNum) { log.info(str); str = str.substring(1, str.length() - 1); String[] strs = str.split(",",entryNum); Map<String, String> map = new HashMap<String, String>(); for (String string : strs) { String key = string.split("=")[0].trim(); String value = string.split("=")[1]; map.put(key, value); } return map; } }