上手前建议先看上一篇blog,了解RabbitMQ基础知识。
具体内容可以查询官网:http://www.rabbitmq.com/
一、环境搭建
todo 这个后续再补充,测试的时候为了方便,直接使用了现成的线下环境。
二、新建项目
老样子使用spring-boot新建一个web项目,spring-boot中已经整合了RabbitMq,所以可以直接加入RabbitMQ的starter,如下:
1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>定义Publisher
1
2
3
4
5
6
7
8
9
10
public class Publisher {
private AmqpTemplate amqpTemplate;
public void sendMessage(String exchange, String message) {
amqpTemplate.convertAndSend(exchange, message);
System.out.println("---------send message--------- : " + message);
}
}定义Consumer
实现MessageListener的接口,具体接收消息的配置放在xml中
1
2
3
4
5
6
7
8public class Consumer implements MessageListener {
public void onMessage(Message message) {
System.out.println("---------Recieved Message: ----------" + message);
}
}Rabbit配置文件
主要配置的是connectionFactory,包括RabbitMQ的环境信息,如host和port,以及用户名和密码。
其中还需要定义exchange 和 queue,然后将两者绑定,publisher的send函数中需要使用绑定的key来发送消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory"
username="admin"
password="HkCQECuJ"
host="10.165.125.21" port="5672"
virtual-host="/"
/>
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成-->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定义queue -->
<rabbit:queue name="lshTestQueue"
declared-by="connectAdmin"
durable="true" auto-delete="false" exclusive="false" />
<!-- 定义direct exchange,绑定queue -->
<rabbit:direct-exchange name="lshexchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="lshTestQueue" key="queueTestKey"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="lshexchangeTest" />
<!-- 消息接收者 -->
<bean id="consumer" class="com.example.rabbitmqtest2.core.Consumer"/>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="lshTestQueue" ref="consumer" />
</rabbit:listener-container>
</beans>编写一个测试Controller来发送消息
1
2
3
4
5
6
7
8
9
10
11@Controller
public class TestContoller {
@Resource
private Publisher publisher;
@RequestMapping("/testPublish")
public void testPublish() {
publisher.sendMessage("queueTestKey", "it is a test message");
}
}本地启动web工程就可以测试一下。