1 前言
Redis 基本上是互聯網公司必備的工具了,Redis的應用場景實在太多了,但是有很多相似的功能如果每個項目都要實現一遍就顯得太麻煩了,所以為了方便,我打算開發一個基于 Redis 的工具集,盡量做到開箱即用。
2 目前實現功能
這個工具集并沒有開發完成,實現了部分功能,如下圖
簡單介紹下已經實現的模塊:
common : 整個項目公共模塊,比如AOP工具等;
delay: Redis實現的延遲隊列;
lock: Redis實現的分布式鎖;
mq: Redis實現消息隊列;
query: Redis實現分頁模糊查詢;
web: Redis實現web相關的功能;
duplicate :防止重復提交;
以上的這些模塊都是已經實現的了,還有 社交、限流、冪等相關功能后面會陸續實現。
3 如何使用
1.引入 Maven 依賴
目前可以下載代碼上傳到自己的私服或者本地倉庫,后面會推到 Maven 中央倉庫
cn.org.wangchangjiu redis-util-spring-boot-starter 1.0.0-SNAPSHOT
2.配置文件(application.yaml)開啟各模塊功能開關
redis: util: mq: enable:true delay: enable:true
3.實現消息發送者
MQ消息發送:
延遲消息發送:
4.實現消息監聽器
MQ消息監聽器:
延遲消息監聽器:
4 MQ和delay實現細節
MQ實現細節
容器啟動時,簡單來說就是通過springboot自動裝配,創建一些Bean,如下圖:
值得注意的是,springboot3.X 自動裝配方式有點變化,需要創建文件 META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件,文件內容就直接寫 自動配置類
RedisUtilAutoConfiguration 主自動裝配類會 import 各個模塊的自動裝配類:
我們以 RedisStreamAutoConfiguration 為例:
該裝配類生效需要顯示打開,然后就是創建各種Bean。
最主要的Bean有:
RedisMessageConsumerManager:
該Bean實現了 BeanPostProcessor 接口,主要作用是,獲取被注解 RedisMessageListener 修飾的方法,把信息封裝在 RedisMessageConsumerContainer 對象里,方便后面反射調用。
StreamMessageListenerContainer:
這個Bean主要是做 redis MQ 的配置,比如配置:一次最多獲取多少條消息、沒有消息時阻塞時間、執行任務的executor、錯誤處理器、以及消費組、是否自動ACK等配置,具體代碼如下:
@Bean(initMethod="start",destroyMethod="stop") @DependsOn("redisMessageConsumerManager") @ConditionalOnMissingBean publicStreamMessageListenerContainer>streamMessageListenerContainer(@AutowiredRedisMessageConsumerManagerredisMessageConsumerManager, @AutowiredRedisConnectionFactoryredisConnectionFactory, @AutowiredErrorHandlererrorHandler){ MyRedisStreamProperties.Optionsoptions=myRedisStreamProperties.getOptions(); StreamMessageListenerContainer.StreamMessageListenerContainerOptions >containerOptions= StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() //一次最多獲取多少條消息 .batchSize(options.getBatchSize()) //運行Stream的polltask .executor(getStreamMessageListenerExecutor()) //Stream中沒有消息時,阻塞多長時間,需要比`spring.redis.timeout`的時間小 .pollTimeout(options.getPollTimeout()) //獲取消息的過程或獲取到消息給具體的消息者處理的過程中,發生了異常的處理 .errorHandler(errorHandler) .build(); StreamMessageListenerContainer >streamMessageListenerContainer= StreamMessageListenerContainer.create(redisConnectionFactory,containerOptions); //獲取被RedisMessageListener注解修飾的bean Map consumerContainerGroups= redisMessageConsumerManager.getConsumerContainerGroups(); //循環遍歷,創建消費組 consumerContainerGroups.forEach((groupQueue,redisMessageConsumerContainer)->{ String[]groupQueues=groupQueue.split("#"); //創建消費組 createGroups(groupQueues); RedisMessageListenerredisMessageListener=redisMessageConsumerContainer.getRedisMessageListener(); if(!redisMessageListener.useGroup()){ //獨立消費不使用組 streamMessageListenerContainer.receive(StreamOffset.fromStart(groupQueues[1]),newDefaultGroupStreamListener(redisMessageConsumerContainer)); }else{ //消費組消費 if(redisMessageListener.autoAck()){ //自動ACK streamMessageListenerContainer.receiveAutoAck(Consumer.from(groupQueues[0],"consumer:"+UUID.randomUUID()), StreamOffset.create(groupQueues[1],ReadOffset.lastConsumed()),newDefaultGroupStreamListener(redisMessageConsumerContainer)); }else{ //手動ACK streamMessageListenerContainer.receive(Consumer.from(groupQueues[0],"consumer:"+UUID.randomUUID()), StreamOffset.create(groupQueues[1],ReadOffset.lastConsumed()),newDefaultGroupStreamListener(redisMessageConsumerContainer)); } } }); returnstreamMessageListenerContainer; } /** *創建消費組 *@paramgroupQueues */ privatevoidcreateGroups(String[]groupQueues){ //判斷是否存在隊列Key if(stringRedisTemplate.hasKey(groupQueues[1])){ //獲取消費組沒有則創建 StreamInfo.XInfoGroupsgroups=stringRedisTemplate.opsForStream().groups(groupQueues[1]); if(groups.isEmpty()){ stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]); }else{ AtomicBooleanexists=newAtomicBoolean(false); groups.forEach(xInfoGroup->{ if(xInfoGroup.groupName().equals(groupQueues[0])){ exists.set(true); } }); if(!exists.get()){ stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]); } } }else{ stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]); } } //todo后面這個線程池也可以交由用戶配置 privateExecutorgetStreamMessageListenerExecutor(){ AtomicIntegerindex=newAtomicInteger(1); intprocessors=Runtime.getRuntime().availableProcessors(); ThreadPoolExecutorexecutor=newThreadPoolExecutor(processors,processors,0,TimeUnit.SECONDS, newLinkedBlockingDeque<>(),r->{ Threadthread=newThread(r); thread.setName("async-stream-consumer-"+index.getAndIncrement()); thread.setDaemon(true); returnthread; }); returnexecutor; }
發送消息流程:
redis 延遲隊列的實現原理和這個差不多,主要是 redission延遲隊列 + 自定義注解 + 反射,代碼都差不多。
-
代碼
+關注
關注
30文章
4825瀏覽量
69043 -
隊列
+關注
關注
1文章
46瀏覽量
10927 -
Redis
+關注
關注
0文章
378瀏覽量
10939
原文標題:為了方便開發,我打算實現一個Redis 工具集
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論