title: 69.使用Redis Stream实现MQ CreateTime: 2021-04-13 14:00:00 UpdateTime: 2023-10-28 12:11:55 CategoryName: web --- --- title: "69.使用Redis Stream实现MQ" date: 2021-04-13T14:00:00+08:00 draft: false tags: ["web"] categories: ["web"] author: "springrain" --- ## 1. 前言 https://lolico.me/2020/06/28/Using-stream-to-implement-message-queue-in-springboot/ https://docs.spring.io/spring-data/redis/docs/2.3.6.RELEASE/reference/html/#redis.streams https://gitee.com/chunanyong/springrain/tree/master/springrain-frame-cache-redis Redis5新增了一个Stream的数据类型,这个类型作为消息队列来使用时弥补了List和Pub/Sub的不足并且提供了更强大的功能,比如ack机制以及消费者组等概念,在有轻量消息队列使用需求时,使用这个新类型那是再好不过了. 注意:SpringBoot版本需要大于2.2(即spring-boot-starter-data-redis需要大于2.2).Redis推荐6.0+ 软件环节越少,运维成本越小,架构越简单,[springrain](https://gitee.com/chunanyong/springrain)中使用redis做了 缓存,分布式锁,原子计数器,消息队列.后期考虑基于redisearch实现全文检索. ## 2.项目实现 基于 ```spring-boot-starter-data-redis```实现 ``` ##application.yml中配置redis spring: redis: host: 127.0.0.1 port: 6379 password: ###密码,可以不设置#### timeout: 7200000 ``` pom中依赖```spring-boot-starter-data-redis``` ```xml org.springframework.boot spring-boot-starter-parent 2.5.0 org.springframework.boot spring-boot-starter-data-redis de.ruedigermoeller fst 2.57 ``` [RedisCacheConfig](https://gitee.com/chunanyong/springrain/blob/master/springrain-frame-cache-redis/src/main/java/org/springrain/frame/config/RedisCacheConfig.java)的代码 ```java /** * 缓存的配置,自定义 cacheManager 用于实现替换. * * @author springrain */ @Configuration("configuration-RedisCacheConfig") public class RedisCacheConfig { @Resource private RedisConnectionFactory redisConnectionFactory; // 序列化配置 解析任意对象 public static FstSerializer fstSerializer = new FstSerializer(); // 2.序列化String类型 public static StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); /** * 实际使用的redisTemplate,可以注入到代码中,操作redis * @return */ @Bean("redisTemplate") public RedisTemplate redisTemplate() { RedisTemplate redisTemplate = new RedisTemplate<>(); // 连接工厂 redisTemplate.setConnectionFactory(redisConnectionFactory); //设置默认的序列化器 redisTemplate.setDefaultSerializer(fstSerializer); // value序列化方式采用fstSerializer redisTemplate.setValueSerializer(fstSerializer); // hash的value序列化方式采用fstSerializer redisTemplate.setHashValueSerializer(fstSerializer); // key采用String的序列化方式 redisTemplate.setKeySerializer(stringRedisSerializer); // hash的key也采用String的序列化方式 redisTemplate.setHashKeySerializer(stringRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * 基于redis的cacheManager,使用spring-data-redis的RedisCacheManager * * @return CacheManager 缓存管理器 */ @Bean("cacheManager") public CacheManager cacheManager() { RedisCacheManager redisCacheManager = RedisCacheManager.builder(redisConnectionFactory) .cacheDefaults(defaultCacheConfig(-1)) //.transactionAware() .build(); return redisCacheManager; } /** * 默认的配置 * @param millis 默认的超时时间,单位毫秒 * @return RedisCacheConfiguration 默认的配置 */ private RedisCacheConfiguration defaultCacheConfig(long millis) { //默认配置 RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig(); //设置默认的失效时间,单位毫秒 if (millis>0) { defaultCacheConfig=defaultCacheConfig.entryTtl(Duration.ofMillis(millis)); } //设置序列化方式 defaultCacheConfig=defaultCacheConfig.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(stringRedisSerializer)) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(fstSerializer)) .disableCachingNullValues(); return defaultCacheConfig; } } ``` [AbstractMessageProducerConsumerListener.java](https://gitee.com/chunanyong/springrain/blob/master/springrain-frame-cache-redis/src/main/java/org/springrain/frame/config/AbstractMessageProducerConsumerListener.java) 抽象类实现的监听器,接口无法注入spring的bean,使用抽象类作为隔离Redis API的方式,方便后期更换MQ实现 ```java /** * 因为接口不能注入springBean,使用抽象类实现,主要用于隔离了Redis Stream API,方便后期更换MQ的实现. * 如果未确认消息消费,Redis Stream 暂时没有重试的API,项目启动时使用 retryFailMessage() 重试一次,业务代码可以自行调度retryFailMessage()方法 * 使用生产消费的group模式,用于多个消费者并行消费,只有group模式才有ack应答.如果要订阅发布,每个客户端创建一个group变通实现. * 订阅发布模式,使用 $ 符号订阅最新的消息,目前监听器存在问题,不能正常消费,原因待查 * 子类继承之后注入,需要使用IMessageProducerConsumerListener接口,例如 * * @Component("userMessageProducerConsumerListener") * public class UserMessageProducerConsumerListener extends AbstractMessageProducerConsumerListener * * * * @Resource * IMessageProducerConsumerListener userMessageProducerConsumerListener; * * @param 需要放入队列的对象 */ public abstract class AbstractMessageProducerConsumerListener implements StreamListener>, IMessageProducerConsumerListener,Closeable { private Logger logger = LoggerFactory.getLogger(getClass()); //默认的线程池 //private final Executor defaultExecutor = new SimpleAsyncTaskExecutor(); // 默认batchSize private final int defaultBatchSize=100; //泛型的类型 private final Class genericClass = ClassUtils.getActualTypeGenericSuperclass(getClass()); //监听的容器 private StreamMessageListenerContainer> container = null; @Resource private RedisConnectionFactory redisConnectionFactory; @Resource private RedisTemplate redisTemplate; /** * 消息队列的名称,redis里就是stream的名称 * * @return */ public abstract String getQueueName(); /** * 批量消费的数量 * * @return */ public int getBatchSize() { return defaultBatchSize; } /** * 消费者的名称 * * @return */ public abstract String getConsumerName(); /** * group的名称,如果为空,默认是 getQueueName()+"_defaultGroupName" * * @return */ public String getGroupName() { return getQueueName() + "_defaultGroupName"; } /** * 指定监听器的线程池 * * @return */ public Executor getExecutor() { return null; } /** * spring-data-redis 实现的 stream 原生消费者回调方法,依赖Redis ObjectRecord API,业务中不要直接调用!!!!!!. * 使用自行实现的onMessage(T value, String queueName, String messageId, Long messageTime) 方法 * * @param message 需要消费者处理的消息 */ @Override public void onMessage(ObjectRecord message) { try { RecordId recordId = messageSuccessRecordId(message); if (recordId != null) { //消息确认ack redisTemplate.opsForStream().acknowledge(getQueueName(), getGroupName(), recordId); } } catch (Exception e) { logger.error(e.getMessage(), e); } } /** * 消费消息,隔离Redis API,如果返回true则自动应答,如果返回false,认为消息处理失败 * * @param messageObjectDto * @return */ @Override public abstract boolean onMessage(MessageObjectDto messageObjectDto) throws Exception; /** * 初始化监听器 */ @PostConstruct private void registerConsumerListener() { try { String className = getClass().toString(); if (StringUtils.isBlank(getQueueName())) { logger.error(className + "的getQueueName()为空,registerConsumerListener()方法执行失败."); return; } if (StringUtils.isBlank(getGroupName())) { logger.error(className + "的getGroupName()为空,registerConsumerListener()方法执行失败."); return; } if (StringUtils.isBlank(getConsumerName())) { logger.error(className + "的getConsumerName()为空,registerConsumerListener()方法执行失败."); return; } int batchSize = getBatchSize(); if (batchSize < 1) { batchSize = defaultBatchSize; } Executor executor = getExecutor(); if (executor == null) { executor = new SimpleAsyncTaskExecutor(); } // 增加自定义的 BytesToTimestampConverter 类型转换器. // spring jdbc 把 datetime 类型解析成了 java.sql.timestamp,spring-data-redis并没用提供BytesToTimestampConverter,造成无法转换类型 CustomConversions customConversions = new RedisCustomConversions(Arrays.asList(new BytesToTimestampConverter())); // 使用 ObjectHashMapper 构造函数 注册自定义的转换器 ObjectHashMapper objectHashMapper= new ObjectHashMapper(customConversions); //监听器的配置项 StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(batchSize) //一批次拉取的最大count数 .executor(executor) //线程池 .pollTimeout(Duration.ZERO) //阻塞式轮询 //设置默认的序列化器,要和 redisTemplate 保持一致!!!!!!!!!!!!!!!!!!!!! //默认 targetType 会设置序列化器是 RedisSerializer.byteArray,这里手动初始化objectMapper,并设置自定义转换器和序列化器. .objectMapper(objectHashMapper) .keySerializer(RedisCacheConfig.stringRedisSerializer) .hashKeySerializer(RedisCacheConfig.stringRedisSerializer) .hashValueSerializer(RedisCacheConfig.fstSerializer) //.serializer(RedisCacheConfig.fstSerializer) .targetType(genericClass) //目标类型(消息内容的类型),如果objectMapper为空,会设置默认的ObjectHashMapper .build(); container = StreamMessageListenerContainer.create(redisConnectionFactory, options); //检查创建group组 prepareChannelAndGroup(redisTemplate.opsForStream(), getQueueName(), getGroupName()); // 通过xread命令也就是非消费者组模式直接读取,或者使用xreadgroup命令在消费者组中命令一个消费者去消费一条记录, // 我们可以通过0.>.$分别表示第一条记录.最后一次未被消费的记录和最新一条记录, // 比如创建消费者组时不能使用>表示最后一次未被消费的记录,比如0表示从第一条开始并且包括第一条, // $表示从最新一条开始但并不是指当前Stream的最后一条记录,是表示下一个xadd添加的那一条记录,所以说$在非消费者组模式的阻塞读取下才有意义! // 消费者 Consumer consumer = Consumer.from(getGroupName(), getConsumerName()); // 需要手动回复应答 ACK // container.receive(consumer, StreamOffset.fromStart(getQueueName()), this); // container.receive(consumer, StreamOffset.create(getQueueName(),ReadOffset.latest()), this); container.receive(consumer, StreamOffset.create(getQueueName(), ReadOffset.lastConsumed()), this); container.start(); //开启线程,重试异常的消息 executor.execute(() -> { //重试失败的消息 try { retryFailMessage(); } catch (Exception e) { logger.error(e.getMessage(), e); } }); } catch (Exception e) { logger.error(e.getMessage(), e); } } /** * 重试消息,项目启动时会重试一次,业务代码自行实现根据调度重试 * 避免死循环,最多1000次.如果单次返回的所有消息都是异常的,终止重试. * 如果全部重试成功,返回null.如果还有部分失败,就返回失败的消息记录 * * @return 返回重试失败的消息记录对象 */ @Override public List> retryFailMessage() throws Exception { int batchSize = getBatchSize(); if (batchSize < 1) { batchSize = defaultBatchSize; } //消费者 Consumer consumer = Consumer.from(getGroupName(), getConsumerName()); //设置配置 StreamReadOptions streamReadOptions = StreamReadOptions.empty().count(batchSize).block(Duration.ofSeconds(5)); List> retryFailMessageList = new ArrayList<>(); //避免死循环,最多1000次.如果单次返回的所有消息都是异常的,退出循环 for (int i = 0; i < 1000; i++) { List> readList = redisTemplate.opsForStream().read(genericClass, consumer, streamReadOptions, StreamOffset.fromStart(getQueueName())); //如果已经没有异常的消息,退出循环 if (CollectionUtils.isEmpty(readList)) { break; } //如果返回的消息全部都是异常的,退出循环 if (retryFailMessageList.containsAll(readList)) { break; } // 遍历异常的消息 for (ObjectRecord message : readList) { RecordId recordId = messageSuccessRecordId(message); //处理成功 if (recordId != null) { //消息确认ack redisTemplate.opsForStream().acknowledge(getQueueName(), getGroupName(), recordId); } else {//处理失败,记录下来 retryFailMessageList.add(message); } } } // 没有失败的消息记录 if (CollectionUtils.isEmpty(retryFailMessageList)) { return null; } //返回处理异常的消息 List> retryFailMessageObjectList = new ArrayList<>(); for (ObjectRecord message : retryFailMessageList) { retryFailMessageObjectList.add(objectRecord2MessageObject(message)); } return retryFailMessageObjectList; } /** * 在初始化容器时,如果key对应的stream或者group不存在时会抛出异常,所以我们需要提前检查并且初始化. * * @param ops * @param queueName * @param group */ private void prepareChannelAndGroup(StreamOperations ops, String queueName, String group) { String status = "OK"; try { StreamInfo.XInfoGroups groups = ops.groups(queueName); if (groups.stream().noneMatch(xInfoGroup -> group.equals(xInfoGroup.groupName()))) { //status = ops.createGroup(queueName, group); status = ops.createGroup(queueName, ReadOffset.from("0-0"), group); } } catch (Exception exception) { RecordId initialRecord = ops.add(ObjectRecord.create(queueName, "Initial Record")); Assert.notNull(initialRecord, "Cannot initialize stream with key '" + queueName + "'"); status = ops.createGroup(queueName, ReadOffset.from(initialRecord), group); } finally { Assert.isTrue("OK".equals(status), "Cannot create group with name '" + group + "'"); } } /** * 生产者向消息队列发送消息 * * @param message * @return */ @Override public MessageObjectDto sendProducerMessage(T message) throws Exception { if (message == null) { return null; } try { ObjectRecord record = Record.of(message).withStreamKey(getQueueName()); //StreamRecords.newRecord() //ObjectRecord record = Record.of(message).withStreamKey(queueName); RecordId recordId = redisTemplate.opsForStream().add(record); // return recordId.getValue(); return new MessageObjectDto(message, getQueueName(), recordId.getValue(), recordId.getTimestamp()); } catch (Exception e) { logger.error(e.getMessage(), e); throw e; //return null; } } /** * 消息消费是否成功,成功返回RecordId,失败返回null * * @param message * @return */ private RecordId messageSuccessRecordId(ObjectRecord message) { RecordId recordId = message.getId(); MessageObjectDto messageObjectRecord = objectRecord2MessageObject(message); try { boolean ok = onMessage(messageObjectRecord); if (ok) { return recordId; } else { return null; } } catch (Exception e) { logger.error(e.getMessage(), e); return null; } } /** * ObjectRecord2MessageObject 类型转换 * * @param message * @return */ private MessageObjectDto objectRecord2MessageObject(ObjectRecord message) { RecordId recordId = message.getId(); String messageId = recordId.getValue(); Long messageTime = recordId.getTimestamp(); String queueName = message.getStream(); T messageObject = message.getValue(); MessageObjectDto messageObjectRecord = new MessageObjectDto<>(messageObject, queueName, messageId, messageTime); return messageObjectRecord; } @Override public void close() throws IOException { if (container != null) { container.stop(); } } } ``` 2. 使用 子类监听器,声明为SpringBean,例如 ```@Component("userMessageProducerConsumerListener")``` ```java @Component("userMessageProducerConsumerListener") public class UserMessageProducerConsumerListener extends AbstractMessageProducerConsumerListener { @Override public String getQueueName() { return "queue_user"; } @Override public String getConsumerName() { return "consumer_user"; } @Override public boolean onMessage(MessageObjectDto messageObjectRecord) throws Exception { messageObjectRecord.getMessageObject(); return true; } } ``` Controller中注入子类的监听器,注意使用```IMessageProducerConsumerListener```接口声明 ```java @Resource private IMessageProducerConsumerListener userMessageProducerConsumerListener; /** * 健康检查 * * @return */ @RequestMapping(value = "/checkHealth", method = RequestMethod.GET) public ReturnDatas checkHealth() { User user=new User(); user.setAccount("1111111111111"); user.setPassword("2222222222222"); //发送到消息队列 userMessageProducerConsumerListener.sendProducerMessage(user); //重试消费失败的消息 //userConsumerListener.retryFailMessage(); return ReturnDatas.getSuccessReturnDatas(); } ``` ## 4.重试 Redis Stream 暂时没有重试的API,项目启动时使用 retryFailMessage() 重试一次,业务代码可以自行调度retryFailMessage()方法. 避免死循环,最多1000次.如果单次返回的所有消息都是异常的,终止重试. ```java //重试消费失败的消息,返回重试再次失败的消息 List> retryFailMessageObjectList=userMessageProducerConsumerListener.retryFailMessage(); ```