本文共 4808 字,大约阅读时间需要 16 分钟。
RabbitMq消息消费者服务
开发工具Idea和Spring boot来开发的。
消息消费目前只是一个简单的Demo,后续会处理成更智能一些。
首先配置文件类,RabbitMqConfig,里面配置一些用户名和密码嗨哟队列信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | package com.basic.rabbitmq.consumer.config; import com.basic.rabbitmq.consumer.listener.HandleMessageListenerAdapter; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.core.env.Environment; import com.rabbitmq.client.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; /** * Rabbitmq配置类 * Created by sdc on 2017/7/4. */ @Configuration @ComponentScan (basePackages = { "com.basic" }) @PropertySource (value = { "classpath:application.properties" }) public class RabbitMqConfig { @Autowired private Environment env; /** * 构建connectionfactory * @return * @throws Exception */ @Bean public ConnectionFactory connectionFactory() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(env.getProperty( "spring.rabbitmq.host" )); connectionFactory.setPort(Integer.valueOf( "5672" .trim())); connectionFactory.setVirtualHost( "/" ); connectionFactory.setUsername(env.getProperty( "spring.rabbitmq.username" )); connectionFactory.setPassword(env.getProperty( "spring.rabbitmq.password" )); return connectionFactory; } /** * CachingConnectionFactory * @return * @throws Exception */ @Bean public CachingConnectionFactory cachingConnectionFactory() throws Exception { return new CachingConnectionFactory(connectionFactory()); } /** * RabbitTemplate,类似于jdbctemplate一样的工具类 * @return * @throws Exception */ @Bean public RabbitTemplate rabbitTemplate() throws Exception { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory()); rabbitTemplate.setChannelTransacted( true ); return rabbitTemplate; } @Bean public AmqpAdmin amqpAdmin() throws Exception { return new RabbitAdmin(cachingConnectionFactory()); } @Bean public SimpleMessageListenerContainer listenerContainer( @Qualifier ( "handleMessageListenerAdapter" ) HandleMessageListenerAdapter handleMessageListenerAdapter) throws Exception { //队列名字 String queueName = env.getProperty( "emial.server.queue" ).trim(); //单一的消息监听容器 SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(cachingConnectionFactory()); simpleMessageListenerContainer.setQueueNames(queueName); simpleMessageListenerContainer.setMessageListener(handleMessageListenerAdapter); //手动设置 ACK,就是成功消费信息了,就设置一下这个,rabbitmq就从此队列里删除这条信息了。 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); return simpleMessageListenerContainer; } } |
我这里配置了一个SimpleMessageListenerContainer,这个Bean,用来监听队列里的消息的。
具体的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | package com.basic.rabbitmq.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.ComponentScan; import org.springframework.mail.MailMessage; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 监听消息的处理适配器 * Created by sdc on 2017/7/10. */ @Component ( "handleMessageListenerAdapter" ) public class HandleMessageListenerAdapter extends MessageListenerAdapter { // @Resource // private JavaMailSender mailSender; /** * 这块和activemq那个监听器差不多,都是监听信息,也都是onMessage方法。 * @param message * @param channel * @throws Exception */ @Override public void onMessage(Message message, Channel channel) throws Exception { String messageDetail = new String(message.getBody()); //消息体 System.out.println( "消息消费:" + messageDetail); // 手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } } |
还有一些配制文件,请看
这个博客,就可以看到具体的配制了。
启动这个项目,就可以从队列消费消息了。消费者还是比较简单的,对应到相应的队列就可以处理了消息了。
本文转自 豆芽菜橙 51CTO博客,原文链接:http://blog.51cto.com/shangdc/1945974
转载地址:http://ywflx.baihongyu.com/