前言 提起消息队列,大部分java程序员并不陌生,在盛行的微服务体系下,rocketMQ,kfaka是很多java程序员的必修课。然而对于单体应用,rocketMQ亦或kfaka未免显得太重,这时又该怎么选择消息队列呢? 答案就是熟悉的redis需求背景 笔者的需求背景是这样的,公司有两个java服务,一个负责前台业务,也就是用户端的业务;一个负责后台业务,也就是管理端的业务。两个服务都只用到了mysql和redis,算是比较简单的业务服务。前台业务部署在两台服务器上,并且需要支持弹性扩张;后台业务部署在一台服务器上。此时有一个刷新本地缓存的功能,需要在管理后台服务发起,两个用户前台服务执行。 第一版,采用了直接请求的方案。管理后台服务会直接调用前台服务的内网IP,通过http请求直接刷机器的本地缓存。彼时在功能实现后,出现了些许稳定性问题。因为两个服务都有做权限控制,在http请求时可能是漏了一些关键数据。重点是直接调用维护麻烦,管理后台需要循环调用两个前台服务的接口,如果加了服务器还要改配置改代码。 改版时,想到这是一个典型的消息队列问题。生产者是管理后台,消费者是用户前台。集成rocketMQ或者kfaka肯定可以满足需求,但是维护工作量也是增加了很多,而已用组件里面的redis可以做消息队列,说实话还没用过redis做消息队列,正好体验一下。详细选型 调查了下,Redis实现消息队列可以说有三种方式:List点对点,pubsub,stream。List点对点。简单直接,Lpush,lpop,先进先出。考虑阻塞式,blpush,blpop,满足点对点场景。pubsub。经典应用型,没得说。只是不支持消息持久化,对于数据质量要求比较高的场景,需要慎用。stream。感觉比较新颖,基本能和成熟型消息队列产品对飚,集成难度感觉大一些。 综合考虑,先排除List,再考虑时间因素,先上一个pubsub。具体实现 这部分就是看代码了,笔者使用的是redis5。0,springboot2。3。通用的redis配置就不负伤了,讲讲消息队列需要的配置及代码 第一步是增加bean配置。初始化监听器、监听者与监听方法:初始化监听器paramconnectionFactoryfparamlistenerAdapterlreturncBeanRedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory,MessageListenerAdapterlistenerAdapter){RedisMessageListenerContainercontainernewRedisMessageListenerContainer();container。setConnectionFactory(connectionFactory);container。addMessageListener(listenerAdapter,newPatternTopic(RedisConst。REDISCHANNEL));newPatternTopic(这里是监听的通道的名字)通道要和发布者发布消息的通道一致}绑定消息监听者和接收监听的方法paramredisReceiverrreturnlBeanMessageListenerAdapterlistenerAdapter(RedisMQReceiverredisReceiver){returnnewMessageListenerAdapter(redisReceiver,receiveMessage);} 第二步,实现receiver。这里笔者在收到消息后,将其转为内部事件再次分发,这样这部分代码可以通用。再有项目需要用到直接复制过去即可。Slf4jComponentpublicclassRedisMQReceiver{AutowiredprivateApplicationEventP收到通道的消息之后执行的方法parammessagestring消息publicvoidreceiveMessage(Stringmessage){try{这里是收到通道的消息之后执行的方法RedisMessageDTOmsgJsonUtil。parse(message,RedisMessageDTO。class);log。debug(收到消息{},msg);publisher。publishEvent(msg);}catch(Exceptione){log。error(处理消息异常。msg{},message,e);}}} 第三步,实现publisherSlf4jComponentpublicclassRedisMQPublisher{AutowiredprivateStringRedisTemplatestringRedisT向通道发送消息的方法paramchannel通道parammessage消息publicvoidsendChannelMess(Stringchannel,Objectmessage){try{StringjsonJsonUtil。toJSON(message);stringRedisTemplate。convertAndSend(channel,json);}catch(Exceptione){log。error(Exception,e);}}publicvoidsendChannelMess(Objectmessage){this。sendChannelMess(RedisConst。REDISCHANNEL,message);}} 使用的时候,消息发送后,各个接收端就能实现消息的监听与消费。最后 对于刷本地缓存这种操作,这个消息队列是能够符合要求的。但是其他的业务就不好说了,比如商品、订单、用户信息等等,这种不能持久化的消息队列明显是会出大问题的。给个赞,后面笔者再试下stream做消息队列的集成。