博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring BOOT 集成 RabbitMq 实战操作(一)
阅读量:5986 次
发布时间:2019-06-20

本文共 4808 字,大约阅读时间需要 16 分钟。

RabbitMq消息消费者服务 

开发工具Idea和Spring boot来开发的。

消息消费目前只是一个简单的Demo,后续会处理成更智能一些。

首先配置文件类,RabbitMqConfig,里面配置一些用户名和密码嗨哟队列信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package 
com.basic.rabbitmq.consumer.config;
 
import 
com.basic.rabbitmq.consumer.listener.HandleMessageListenerAdapter;
import 
org.springframework.amqp.core.*;
import 
org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import 
org.springframework.amqp.rabbit.core.RabbitAdmin;
import 
org.springframework.amqp.rabbit.core.RabbitTemplate;
import 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import 
org.springframework.beans.factory.annotation.Qualifier;
import 
org.springframework.context.annotation.Bean;
import 
org.springframework.core.env.Environment;
import 
com.rabbitmq.client.ConnectionFactory;
import 
org.springframework.beans.factory.annotation.Autowired;
import 
org.springframework.context.annotation.ComponentScan;
import 
org.springframework.context.annotation.Configuration;
import 
org.springframework.context.annotation.PropertySource;
 
/**
 
* Rabbitmq配置类
 
* Created by sdc on 2017/7/4.
 
*/
@Configuration
@ComponentScan
(basePackages = {
"com.basic"
})
@PropertySource
(value = {
"classpath:application.properties"
})
public 
class 
RabbitMqConfig {
 
    
@Autowired
    
private 
Environment env;
 
    
/**
     
* 构建connectionfactory
     
* @return
     
* @throws Exception
     
*/
    
@Bean
    
public 
ConnectionFactory connectionFactory() 
throws 
Exception {
        
ConnectionFactory connectionFactory = 
new 
ConnectionFactory();
        
connectionFactory.setHost(env.getProperty(
"spring.rabbitmq.host"
));
        
connectionFactory.setPort(Integer.valueOf(
"5672"
.trim()));
        
connectionFactory.setVirtualHost(
"/"
);
        
connectionFactory.setUsername(env.getProperty(
"spring.rabbitmq.username"
));
        
connectionFactory.setPassword(env.getProperty(
"spring.rabbitmq.password"
));
        
return 
connectionFactory;
    
}
 
    
/**
     
* CachingConnectionFactory
     
* @return
     
* @throws Exception
     
*/
    
@Bean
    
public 
CachingConnectionFactory cachingConnectionFactory() 
throws 
Exception {
        
return 
new 
CachingConnectionFactory(connectionFactory());
    
}
 
    
/**
     
* RabbitTemplate,类似于jdbctemplate一样的工具类
     
* @return
     
* @throws Exception
     
*/
    
@Bean
    
public 
RabbitTemplate rabbitTemplate() 
throws  
Exception {
        
RabbitTemplate rabbitTemplate = 
new 
RabbitTemplate(cachingConnectionFactory());
        
rabbitTemplate.setChannelTransacted(
true
);
        
return 
rabbitTemplate;
    
}
 
    
@Bean
    
public 
AmqpAdmin amqpAdmin() 
throws  
Exception {
        
return 
new 
RabbitAdmin(cachingConnectionFactory());
    
}
 
    
@Bean
    
public 
SimpleMessageListenerContainer listenerContainer(
            
@Qualifier
(
"handleMessageListenerAdapter"
) HandleMessageListenerAdapter handleMessageListenerAdapter) 
throws 
Exception {
        
//队列名字
        
String queueName = env.getProperty(
"emial.server.queue"
).trim();
 
        
//单一的消息监听容器
        
SimpleMessageListenerContainer simpleMessageListenerContainer =
                
new 
SimpleMessageListenerContainer(cachingConnectionFactory());
        
simpleMessageListenerContainer.setQueueNames(queueName);
        
simpleMessageListenerContainer.setMessageListener(handleMessageListenerAdapter);
        
//手动设置 ACK,就是成功消费信息了,就设置一下这个,rabbitmq就从此队列里删除这条信息了。
        
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 
        
return 
simpleMessageListenerContainer;
    
}
 
 
}

我这里配置了一个SimpleMessageListenerContainer,这个Bean,用来监听队列里的消息的。

具体的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package 
com.basic.rabbitmq.consumer.listener;
 
        
import 
com.rabbitmq.client.Channel;
        
import 
org.springframework.amqp.core.Message;
        
import 
org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
        
import 
org.springframework.beans.factory.annotation.Autowired;
        
import 
org.springframework.context.annotation.ComponentScan;
        
import 
org.springframework.mail.MailMessage;
        
import 
org.springframework.mail.javamail.JavaMailSender;
        
import 
org.springframework.stereotype.Component;
 
        
import 
javax.annotation.Resource;
 
/**
 
* 监听消息的处理适配器
 
* Created by sdc on 2017/7/10.
 
*/
@Component
(
"handleMessageListenerAdapter"
)
public 
class 
HandleMessageListenerAdapter 
extends 
MessageListenerAdapter {
 
//    @Resource
//    private JavaMailSender mailSender;
 
    
/**
     
* 这块和activemq那个监听器差不多,都是监听信息,也都是onMessage方法。
     
* @param message
     
* @param channel
     
* @throws Exception
     
*/
    
@Override
    
public 
void 
onMessage(Message message, Channel channel) 
throws 
Exception {
        
String messageDetail = 
new 
String(message.getBody()); 
//消息体
        
System.out.println(
"消息消费:" 
+ messageDetail);
 
        
// 手动ACK
        
channel.basicAck(message.getMessageProperties().getDeliveryTag(), 
false
);
    
}
}

还有一些配制文件,请看

这个博客,就可以看到具体的配制了。

启动这个项目,就可以从队列消费消息了。消费者还是比较简单的,对应到相应的队列就可以处理了消息了。

本文转自 豆芽菜橙 51CTO博客,原文链接:http://blog.51cto.com/shangdc/1945974

转载地址:http://ywflx.baihongyu.com/

你可能感兴趣的文章
我的友情链接
查看>>
SUSE开启ssh服务
查看>>
spring表达式语言(SpEL)简述及Hello World示例
查看>>
我的yum本地源配置
查看>>
Linux基础篇之SELinux
查看>>
成功的培训
查看>>
IOS开发之UITableView1
查看>>
关于ARM的22个常用概念介绍
查看>>
经典语录
查看>>
Java学习笔记(29)——Java集合01之总体框架
查看>>
数据库权限设计
查看>>
.net 事件传递
查看>>
require和include区别
查看>>
新安装系统安装QQ不能使用
查看>>
react-navigation 导航栏使用
查看>>
vanish(squid) + HAProxy + nginx + memcached(redis)
查看>>
/etc/inittab文件详解
查看>>
一个较完整的SpringMVC工程的配置
查看>>
JavaScript实现前端路由
查看>>
maven 搭建
查看>>