• Stars
    star
    118
  • Rank 299,923 (Top 6 %)
  • Language
    Java
  • Created almost 9 years ago
  • Updated almost 9 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

基于 Spring 和 Redis 的分布式消息队列(MessageQueue)实现

spring-redis-mq

基于Spring和Redis的分布式消息队列(MessageQueue)

使用方法

创建项目

由于这个库还没有提交到Maven的中央仓库,所以需要手动将其导入到你的私人仓库中。首先fork源码到本地后使用mvn package打包。

然后添加到本地仓库:

mvn install:install-file  
-DgroupId=com.scienjus
-DartifactId=spring-redis-mq
-Dversion=1.0-SNAPSHOT
-Dpackaging=jar  
-Dfile=/path/to/jar/spring-redis-mq.jar

所有依赖 Jar:

<properties>
  <spring.version>4.1.8.RELEASE</spring.version>
  <jedis.version>2.7.3</jedis.version>
  <aspectj.version>1.8.7</aspectj.version>
  <quartz.version>2.2.1</quartz.version>
</properties>

<dependencies>
  <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>${jedis.version}</version>
  </dependency>

  <!-- For quartz -->
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>${spring.version}</version>
  </dependency>

  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>${spring.version}</version>
  </dependency>

  <dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>${quartz.version}</version>
  </dependency>

  <!--For aop-->
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aop</artifactId>
    <version>${spring.version}</version>
  </dependency>

  <dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>${aspectj.version}</version>
  </dependency>
</dependencies>

配置Spring Bean

配置Jedis客户端:

@Bean
public JedisPool jedisPool() {
    return new JedisPool("127.0.0.1", 6379);
}

配置消费者:

@Bean
public Consumer consumer() {
    RedisConsumer consumer = new RedisConsumer();
    consumer.setJedisPool(jedisPool());
    return consumer;
}

配置生产者:

@Bean
public Producer producer() {
    RedisProducer producer = new RedisProducer();
    producer.setJedisPool(jedisPool());
    return producer;
}

配置消费者定时扫描任务(仅当使用注解驱动的消费者时才需要配置):

@Bean(initMethod = "init")
public SchedulerBeanFactory schedulerBeanFactory() {
    SchedulerBeanFactory schedulerBeanFactory = new SchedulerBeanFactory();
    schedulerBeanFactory.setConsumer(consumer());
    return schedulerBeanFactory;
}

注意一定要将initMethod设为init方法。

配置生产者自动推送任务(仅当使用注解驱动的生产者时才需要配置):

@Bean
public ProducerWorker producerWorker() {
    ProducerWorker producerWorker = new ProducerWorker();
    producerWorker.setProducer(producer());
    return producerWorker;
}

创建生产者实例

方法1:注入producer,调用sendMessage方法:

@Component
public class SayHelloProducer {

    @Autowired
    private Producer producer;

    public void sayHello(String name) {
        producer.sendMessage("say_hello", new Message(name));
    }
}

方法2:使用@ToQueue注解,retrun需要发送的对象(需要配置producerWorker):

@Producer
public class UserServiceImpl implements UserService {

    @Autowired
    private UserDao userDao;

    @ToQueue(topic = "new_user")  //添加新的用户后,将其发送到消息队列
    public User insert(User user) {
        this.userDao.insert(user);
        return user;
    }
}

创建消费者实例

方法1:注入consumer,调用getMessage方法(需要自己开线程循环获取):

@Component
public class SayHelloConsumer {

    @Autowired
    private Consumer consumer;

    public void sayHello() {
        Message message;
        while ((message = consumer.getMessage("say_hello")) != null) {
            System.out.println("Hello ! " + message.getContent() + " !");
        }
    }
}

方法2:为类添加@Consumer注解,为对应的方法添加@OnMessage注解(需要配置schedulerBeanFactory):

@Consumer
public class SayHelloConsumer {

    @OnMessage(topic = "say_hello")
    public void onSayHello(String name) {
        System.out.println("Hello ! " + name + " !");
    }
}

消费者的重试机制

@OnMessage方法的返回值类型为boolean类型,并且执行的结果为false时,系统认定此消息执行失败。

消息执行失败后,系统会将这个消息重新插入到消息队列中(顺序排在最后)。

通过@ToQueueexpire属性可以设置消息的生存时间(单位为秒),默认为永不过期。

当消息的生存时间超过后还没有消费成功,系统将会丢掉这个消息。

一个简单的例子:

//生产者

@Producer
public class UserServiceImpl implements UserService {

    @Autowired
    private UserDao userDao;

    @ToQueue(topic = "new_user", expire = 24 * 3600)  //添加新的用户后,将其发送到消息队列,消息的生存时间是24小时
    public User insert(User user) {
        this.userDao.insert(user);
        return user;
    }
}

@Consumer
public class NewUserConsumer {

    @OnMessage(value = "new_user")  //如果邮件发送失败,需要尝试重新发送。
    public boolean onNewUser(User user) {
        try {
            //发送邮件
            MailSender.sendWelcomeMail(user.getEmail(), user.getNickname());
            //发送成功,任务完成,返回true
            retrun true;
        } catch (Exception e) {
            //发送失败,尝试重试,返回false
            retrun false;
        }
    }
}

当然,如果一个消费方法永远不会失败(或是失败后不需要重试),可以直接设置为void方法。

PS:之前版本使用重试次数控制失败处理。但是系统修复需要的是时间,而重试次数很有可能会被浪费掉,因此改为了生存时间。

待办事项

  • 消息处理失败的处理(重新插入队列,设置消息的生存周期)
  • 将队列分为两个阶段,等待投递和等待接收
  • 监控页面

帮助

我的邮箱:[email protected]

由于 Redis 本身的限制,这个项目并不适合使用在生产环境中,在此推荐 Redis 作者开发的消息队列 Disque。一些介绍:

More Repositories

1

smartqq

SmartQQ(WebQQ)的Api ,你可以用它实现自己的QQ 机器人 a qq robot based on smartqq (webqq) api
Java
1,149
star
2

spring-restful-authorization

这个 Demo 用于演示如何在 RESTful 下使用自定义 Token 保持客户端登录状态,依靠 Spring 的拦截器和解析器完成权限验证及登录用户注入,并使用 Redis 存储 Token。
Java
859
star
3

spring-authorization-manager

基于 Spring MVC,提供 API 服务端的身份验证功能。通过 Redis、MySQL 维护登录用户与分配 Token 的映射关系。
Java
334
star
4

qqbot

基于SmartQQ(WebQQ)的QQ机器人 / a qq robot based on smartqq(webqq) api
Ruby
280
star
5

pixiv-crawler

通过网页爬虫批量下载 Pixiv 图片
Java
57
star
6

pixiv-parser

批量抓取和下载 Pixiv 上的图片 Batch download pictures from Pixiv
Java
36
star
7

spring-authorization-manager-demo

https://github.com/ScienJus/spring-authorization-manager 的Demo
Java
34
star
8

spring-disque

基于 Spring 和 Jedis 的 Disque 封装,使用注解驱动
Java
22
star
9

spring-cloud-etcd

[WIP] Etcd integration with Spring Cloud, based on etcd v3 api(jetcd).
Java
20
star
10

elasticsearch-chinese-analyzers-contrasts

Elasticsearch中文分词插件分词效果对比(Ik、Ansj、Mmseg和Jieba)
JavaScript
10
star
11

play-with-ruby

记录个人学习 Ruby 的过程
Ruby
10
star
12

smzdm-push

什么值得买非官方的邮件推送,http://smzdm.scienjus.com/ 简单测试中(注册密码必须8位以上)
Ruby
6
star
13

fastroute-spring

nikic/FastRoute implementation in Spring
Java
6
star
14

django_gaode_maps

Django 管理后台的高德地图控件
Python
5
star
15

thrift-spring-boot-starter

[WIP] Apache Thrift integration with Spring Boot. nonblocking client/server, service discovery, load balancing and more.
Java
4
star
16

mybatis-redis-counter

use redis to counting in mybatis, non invasive and easy to use / 在MyBatis项目中使用Redis辅助计数,使用简单且无侵入性
Java
3
star
17

konata

micro web framework like Pippo and written in Kotlin
Kotlin
3
star
18

sofa-rpc-consul-registry-demo

Java
2
star
19

learn-spring-restdocs

Learn how to use Spring REST Docs based on Spring Boot2 and JUnit5.
HTML
1
star
20

fckjnb

工行猴年纪念币预约脚本
Ruby
1
star
21

sofa-rpc-hystrix-with-spring-cloud-example

Java
1
star