消息队列 · 2021年3月25日 0

SpringBoot整合ActiveMQ

  • activemq是由Apache出品对应java的jms的消息中间件,能够做到很好的分布式系统模块解耦异步通知而无需进行线程等待,下面是activemq与springboot的整合;
  • 安装: 首先需要下载activemq,本次是基于windows环境下的安装下载地址
  • .下载之后
    在这里插入图片描述
  • 解压后进入bin目录下,直接启动activemq.bat批处理文件;
    在这里插入图片描述
  • 可以验证一下,进入activemq的后台查看http://localhost:8161/admin,账号密码都是admin;
  • 下面开始进行整合,新建两个模块
    在这里插入图片描述
  • 然后是pom依赖,因为我之前的jar包都在里面实际在这个项目中只需要导入springboot和web,以及activemq的依赖就行了
<dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--热部署配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
        </dependency>
        <!--mybatis起步依赖-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- MySQL连接驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--日志依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.2</version>
        </dependency>
        <!-- 配置使用redis启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--junit测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <!--fastJson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>RELEASE</version>
        </dependency>
        <!--MongoDB-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <!--activeMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
    </dependencies>
    <!--mybatis逆向工程插件-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.mybatis.generator</groupId>
                <artifactId>mybatis-generator-maven-plugin</artifactId>
                <version>1.3.2</version>
            </plugin>
        </plugins>
    </build>
    <!--配置阿里云远程Maven仓库-->
    <repositories>
        <repository>
            <id>maven-ali</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
    </repositories>
  • 然后是配置文件
server.port=8900
#url地址,不需要修改
spring.activemq.broker-url=tcp://localhost:61616
#用户名
spring.activemq.user=admin
#密码
spring.activemq.password=admin
#是否启动内存模式,可以不配置,默认也是true
spring.activemq.in-memory=true
#是否使用PooledConnectionFactory
spring.activemq.pool.enabled=false
#点对点设置为false,发布订阅设置为true
spring.jms.pub-sub-domain=true
#数据库配置
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?
useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=1234
  • 两个项目的配置文件是一样的,只需要改一下服务端口号就可以了,下面是代码实现

  • 第一种是点对点,记得要把spring.jms.pub-sub-domain设置为false;

  • 首先是provider模块的编写

  • 新建一个配置类这个类里面就是创建一个队列的名称

package com.sunyw.config;

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsMessagingTemplate;

@Configuration
public class ProviderConfig {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    public void sendQueue(String name) {
       jmsMessagingTemplate.convertAndSend("sendMsg", name);
    }

}
  • 然后是一个controller
package com.sunyw.controller;

import com.sunyw.config.ProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProviderController {
    @Autowired
    private ProviderConfig providerConfig;

    @RequestMapping("send")
    public void sendQueue(String name) {
        providerConfig.sendQueue(name);
    }

}
  • springboot的启动类
package com.sunyw;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

@SpringBootApplication
@EnableJms
public class ProviderApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProviderApplication.class,args);
    }
}
  • 然后是customer模块的编写,配置文件和provider是一样的,只需要改一下端口号
  • 消费者服务类,@JmsListener注解是用来对队列进行监听的,destination是监听队列的名称,我这里写的一号和二号是为了演示,消费者的点对点
package com.sunyw.service;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

@Service
public class CustomerService {
    private int message1=0;
    private int message2=0;

    @JmsListener(destination = "sendMsg")
    public void customerMessage1(String name) {
        System.out.println("一号消费者接受信息:" + name);
        message1++;
        System.out.println("一号消费者接受消息次数为:"+message1);
    }
    @JmsListener(destination = "sendMsg")
    public void customerMessage2(String name) {
        System.out.println("二号消费者接受信息:" + name);
        message2++;
        System.out.println("二号消费者接受消息次数为:"+message2);
    }

}
  • springboot启动类
package com.sunyw;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class CustomerApplication {
    public static void main(String[] args) {
        SpringApplication.run(CustomerApplication.class,args);
    }
}
  • 启动服务,通过postman发送http请求到provider的controller类
    在这里插入图片描述
  • 通过activemq可视化界面查看一下队列信息
    在这里插入图片描述
  • 查看一下消费者是不是只有一个才能使用呢?
    在这里插入图片描述
  • 可以看到二号并没有打印出来,这就是点对点的使用;

    <================ =====一条优雅的分割线========================>

  • 下面是发布订阅的模式,不用改很多,首先将配置文件中的spring.jms.pub-sub-domain=false,改为true,然后在ProviderConfig类中需要加入一个类ActiveMQTopic类,这个类就是发布订阅时需要使用到的,用来定义一个队列的名称
package com.sunyw.config;

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsMessagingTemplate;

@Configuration
public class ProviderConfig {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    public void sendQueue(String name) {
        ActiveMQTopic topic=new ActiveMQTopic("sendMsg");
        jmsMessagingTemplate.convertAndSend(topic, name);
    }

}
  • 发送消息看一下打印的结果,可以看到两个监听都接受到了消息;
    在这里插入图片描述
  • 可视化界面,两个消费者在这里插入图片描述
  • 发布订阅的模式也成功了,总结一下
1.点对点模式,生产者生产了一个消息,只能由一个消费者进行消费,如果有多个监听者,会通过轮循的方式来发送给消费者;
2.发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费;
  • end!