一、基础知识1。什么是RabbitMQ RabbitMQ是2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,简称MQ全称为MessageQueue,消息队列(MQ)是一种应用程序对应用程序的通信方法,由Erlang(专门针对于大数据高并发的语言)语言开发,可复用的企业消息系统,是当前最主流的消息中间件之一,具有可靠性、灵活的路由、消息集群简单、队列高可用、多种协议的支持、管理界面、跟踪机制以及插件机制。 2。什么是消息和队列 1。消息就是数据,增删改查的数据。例如在员工管理系统中增删改查的数据 2。队列指的是一端进数据一端出数据,例如C中(Queue数据结构) 3。什么是消息队列 1。消息队列指:一端进消息,一端出消息 2。RabbitMQ就是实现了消息队列概念的一个组件,以面向对象的思想去理解,消息队列就是类,而RabbitMQ就是实例,当然不仅仅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以实现消息队列。 4。什么地方使用RabbitMQ 1。在常见的单体架构中,主要流程是用户UI操作发起Http请求服务器处理然后由服务器直接和数据库交互,最后同步反馈用户结果 2。在微服务架构中,UI与微服务通信,主要是通过Http或者gRPC同步通信 问题分析 在上述2种情况下,我们发现在UI请求时都是同步操作,第2种架构虽然将整体服务按业务拆分成不同的微服务并且对应各自的数据库,但是在用户与微服务通信时,存在的问题依然没有解决,例如数据库的承载能力只能处理10w个请求,如果遇到高并发情况下,UI发起50w请求,那数据库是远远承载不了的,从而导致如下问题。 1。高并发请求导致系统性能下降响应慢,同时数据库承载风险加大 2。扩展性不强UI操作的交互对业务的依赖较大,导致用户体验下降 3。瞬时流量涌入巨大的话,服务器可能直接挂了 解决方案 为了解决性能瓶颈的问题。我们需要将同步通信方式换成异步通信方式。因此就使用消息队列,用户在UI中操作直接写入RabbitMQ然后直接返回,剩下的业务操作由消息队列和各自的微服务来完成 RabbitMQ的优势异步处理,响应快,增加了数据库(服务器的承载能力)削峰,可以把流量的高峰分解到不同的时间段来处理解耦(扩展性就更强),让UI和业务独立演化高可用,处理器如果发生故障了,对其他的处理器没有影响 RabbitMQ的不足增加了系统复杂性,不方便调试和开发,在使用RabbitMQ以前前端直接和服务交互,现在加了一层即时性降低了,在某一程度上提升了用户操作体验,也降低了用户体验,但是避免不了,取长补短更加依赖消息队列了 5。RabbitMQ组成概念 1。ConnectionFactory为Connection的制造工厂。 2。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。 3。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。 4。Exchange(交换机)我们通常认为生产者将消息投递到Queue中,实际上实际的情况是,生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中(或者丢弃),而在RabbitMQ中的Exchange一共有4种策略,分别为:fanout(扇形)、direct(直连)、topic(主题)、headers(头部)二、如何落地RabbitMQ1。RabbitMQ环境安装 1。下载RabbitMQ 2。运行环境erlang 3。安装完成之后,加载RabbitMQ管理插件rabbitmqpluginsenablerabbitmqmanagement 4。安装成功访问RabbitMQ管理后台http:localhost:156722。创建系统业务 1。分别创建考勤服务,请假服务,计算薪酬服务,邮件服务,短信服务消费者角色 2。创建员工管理网站用于模拟前端调用,主要充当生产者角色 3。在员工管理网站和每一个模拟微服务中通过nuget引入RabbitMQ。Client 4。在员工管理网站中创建模拟添加考勤的控制器并加入生产者代码创建连接using(varconnectionfactory。CreateConnection()){创建通道varchannelconnection。CreateModel();定义队列channel。QueueDeclare(CreateAttendance,false,false,false,null);stringjsonJsonConvert。SerializeObject(attendanceDto);创建内容对象varpropertieschannel。CreateBasicProperties();发送消息channel。BasicPublish(exchange:,routingKey:CreateAttendance,basicProperties:properties,body:Encoding。UTF8。GetBytes(json));} 5。在考勤微服务中创建接口,并在接口中加入消费者代码varconnectionfactory。CreateConnection();varchannelconnection。CreateModel();创建消费者事件varconsumernewEventingBasicConsumer(channel);consumer。Received(model,ea){varbodyea。B1、逻辑代码,添加到数据库varmessageEncoding。UTF8。GetString(body。ToArray());objectjsonJsonConvert。DeserializeObject(message);Console。WriteLine(〔x〕创建考勤信息{0},message);};设置消费者属性p1。监听队列p2。消息确认ACKp3。消费者实例赋值channel。BasicConsume(queue:CreateAttendance,autoAck:false,consumer:consumer); 三、Exchange交换机及实例分析1。FanoutExchange(扇形交换机) fanout类型的Exchange路由规则非常简单,工作方式类似于多播一对多,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 1。生产者一个Exchange对应多个Queue,或者不声明Queue 2。消费者定义Exchange,如果生产者定义了Queue,那必须将exchange和queue绑定,如果没有定义队列,那消费者自己声明一个随机Queue用于接收消费消息 业务实例 当我们有员工需要请假,在员工管理系统提交请假,但是由于公司规定普通员工请假,需要发送短信到他的主管领导,针对此业务场景我们需要调用请假服务的同时去发送短信,这时需要两个消费者(请假服务,短信服务)来消费同一条消息,其实本质就是往RabbitMQ写入一个能被多个消费者接收的消息,所以可以使用扇形交换机,一个生产者,多个消费者。 生产者模拟使用调用控制器来实现〔HttpPost〕publicIEnumerableboolCreateLeave(CreateLeaveDtocreateLeaveDto){varfactorynewConnectionFactory(){HostName192。168。0。106,Port5672,Passwordguest,UserNameguest,VirtualHost};using(varconnectionfactory。CreateConnection()){varchannelconnection。CreateModel();定义交换机channel。ExchangeDeclare(exchange:Leavefanout,type:fanout);stringproductJsonJsonConvert。SerializeObject(createLeaveDto);varbodyEncoding。UTF8。GetBytes(productJson);varpropertieschannel。CreateBasicProperties();设置消息持久化properties。Pchannel。BasicPublish(exchange:Leavefanout,routingKey:,basicProperties:properties,body:body);}} 消费者实现IHostedService接口创建一个监听主机publicclassRabbitmqHostService:IHostedService{publicTaskStartAsync(CancellationTokencancellationToken){varfactorynewConnectionFactory(){HostNamelocalhost,Port5672,Passwordguest,UserNameguest,VirtualHost};varconnectionfactory。CreateConnection();varchannelconnection。CreateModel();1、定义交换机channel。ExchangeDeclare(exchange:Leavefanout,type:ExchangeType。Fanout);定义随机队列varqueueNamechannel。QueueDeclare()。QueueN队列和交换机绑定channel。QueueBind(queueName,Leavefanout,routingKey:);varconsumernewEventingBasicConsumer(channel);consumer。Received(model,ea){Console。WriteLine(34;model:{model});varbodyea。B1、业务逻辑varmessageEncoding。UTF8。GetString(body。ToArray());Console。WriteLine(〔x〕创建请假{0},message);1、自动确认机制缺陷,消息是否正常添加到数据库当中,所以需要使用手工确认channel。BasicAck(ea。DeliveryTag,true);};Qos(防止多个消费者,能力不一致,导致的系统质量问题。每一次一个消费者只成功消费一个)channel。BasicQos(0,1,false);消息确认(防止消息消费失败)channel。BasicConsume(queue:queueName,autoAck:false,consumer:consumer);}publicTaskStopAsync(CancellationTokencancellationToken){1、关闭rabbitmq的连接thrownewNotImplementedException();}} 2。DirectExchange(直连交换机) 直接交换器,工作方式类似于单播一对一,Exchange会将消息发送完全匹配ROUTINGKEY的Queue,缺陷是无法实现多生产者对一个消费者 1。生产者一个Exchange对应一个routingKey绑定,也可以声明队列并绑定,然后向指定的队列发送消息。 2。消费者需要定义Exchange和routingKey,如果生产者声明并绑定了队列,那消费者必须绑定生产者指定的Queue来接收消息,如果没有指定Queue,那消费者需要自己声明一个随机Queue然后绑定用于接收消息 当我们员工管理系统需要计算薪资并将结果以发送短信的方式告诉员工,这个时候我们就不太适合用扇形交换机了,因为换做是你,你也不想你的工资全公司都知道吧?这个时候就需要定制了一对一的场景了,那就在生产消息时使用直连交换机根据routingKey发送指定的消费者。 生产者模拟使用调用控制器来实现publicIEnumerableboolSendCalculateSalary(CalculateSalaryDtocalculateSalaryDto){varfactorynewConnectionFactory(){HostName192。168。0。106,Port5672,Passwordadmin,UserNameadmin,VirtualHost};using(varconnectionfactory。CreateConnection()){varchannelconnection。CreateModel();2、定义交换机channel。ExchangeDeclare(exchange:CalculateSalarydirect,type:direct);stringcalculateSalaryDtoJsonJsonConvert。SerializeObject(calculateSalaryDto);varbodyEncoding。UTF8。GetBytes(calculateSalaryDtoJson);3、发送消息varpropertieschannel。CreateBasicProperties();properties。P设置消息持久化p1指定交换机p2routingKeychannel。BasicPublish(exchange:CalculateSalarydirect,routingKey:productsms,basicProperties:properties,body:body);}} 消费者实现IHostedService接口创建一个监听主机publicclassRabbitmqHostService:IHostedService{publicTaskStartAsync(CancellationTokencancellationToken){varfactorynewConnectionFactory(){HostNamelocalhost,Port5672,Passwordguest,UserNameguest,VirtualHost};varconnectionfactory。CreateConnection();varchannelconnection。CreateModel();1、定义交换机channel。ExchangeDeclare(exchange:CalculateSalarydirect,type:ExchangeType。Direct);2、定义随机队列varqueueNamechannel。QueueDeclare()。QueueN3、队列要和交换机绑定起来channel。QueueBind(queueName,CalculateSalarydirect,routingKey:productsms);varconsumernewEventingBasicConsumer(channel);consumer。Received(model,ea){Console。WriteLine(34;model:{model});varbodyea。B1、业务逻辑varmessageEncoding。UTF8。GetString(body。ToArray());Console。WriteLine(〔x〕发送短信{0},message);1、消息是否正常添加到数据库当中,所以需要使用手工确认channel。BasicAck(ea。DeliveryTag,true);};3、消费消息channel。BasicQos(0,1,false);Qos(防止多个消费者,能力不一致,导致的系统质量问题。autoAck设为false不进行自动确认channel。BasicConsume(queue:queueName,autoAck:false,consumer:consumer);}publicTaskStopAsync(CancellationTokencancellationToken){1、关闭rabbitmq的连接thrownewNotImplementedException();}} 3。TopicExchange(主题交换机) Exchange绑定队列需要制定KKey可以有自己的规则;Key可以有占位符;或者,匹配一个单词、匹配多个单词,在Direct基础上加上模糊匹配;多生产者一个消费者,可以多对对,也可以多对1,真实项目当中,使用主题交换机。可以满足所有场景 1。生产者定义Exchange,然后不同的routingKey绑定 2。消费者定义Exchange,如果生产者定义了Queue,那必须将exchange和queue以及routingKey绑定,如果没有定义队列,那消费者自己声明一个随机Queue用于接收消费消息, 3。消费者routingKey的模糊匹配,生产者发送消息时routingKey定义以sms。开头,号只能匹配的routingKey为一级,例如(sms。A)或(sms。B)的发送的消息,能够匹配的routingKey为一级及多级以上,例如(sms。A)或者(sms。A。QWE。IOP) 在月底的时候我们需要把员工存在异常考勤信息,薪资结算信息,请假信息分别以邮件的形式发送给我们的员工查阅,我们知道这是一个典型的多个生产者,一个消费者场景,异常考勤信息,薪资结算信息,请假信息分别需要生产消息发送到RabbitMQ,然后供我们员工消费 分别模拟3个生产者:异常考勤信息,薪资结算信息,请假信息varfactorynewConnectionFactory(){HostName192。168。0。106,Port5672,Passwordadmin,UserNameadmin,VirtualHost};计算薪资生产者publicIEnumerableboolSendCalculateSalary(CalculateSalaryDtocalculateSalaryDto){using(varconnectionfactory。CreateConnection()){varchannelconnection。CreateModel();2、定义topic交换机channel。ExchangeDeclare(exchange:smstopic,type:topic);stringcalculateSalaryDtoJsonJsonConvert。SerializeObject(calculateSalaryDto);varbodyEncoding。UTF8。GetBytes(calculateSalaryDtoJson);3、发送消息varpropertieschannel。CreateBasicProperties();properties。P设置消息持久化p1指定交换机p2routingKeychannel。BasicPublish(exchange:smstopic,routingKey:sms。CalculateSalary,basicProperties:properties,body:body);}}考勤生产者publicIEnumerableboolSendCalculateAttendance(CalculateAttendanceDtocalculateAttendance){using(varconnectionfactory。CreateConnection()){varchannelconnection。CreateModel();2、定义topic交换机channel。ExchangeDeclare(exchange:smstopic,type:topic);stringcalculateAttendanceDtoJsonJsonConvert。SerializeObject(calculateAttendance);varbodyEncoding。UTF8。GetBytes(calculateAttendanceDtoJson);3、发送消息varpropertieschannel。CreateBasicProperties();properties。P设置消息持久化p1指定交换机p2routingKeychannel。BasicPublish(exchange:smstopic,routingKey:sms。CalculateAttendance,basicProperties:properties,body:body);}}请假信息生产者publicIEnumerableboolSendCalculateLeave(CalculateLeaveDtocalculateLeave){using(varconnectionfactory。CreateConnection()){varchannelconnection。CreateModel();2、定义topic交换机channel。ExchangeDeclare(exchange:smstopic,type:topic);stringcalculateLeaveJsonJsonConvert。SerializeObject(calculateLeave);varbodyEncoding。UTF8。GetBytes(calculateLeaveJson);3、发送消息varpropertieschannel。CreateBasicProperties();properties。P设置消息持久化p1指定交换机p2routingKeychannel。BasicPublish(exchange:smstopic,routingKey:sms。CalculateAttendance,basicProperties:properties,body:body);}}publicclassRabbitmqHostService:IHostedService{publicTaskStartAsync(CancellationTokencancellationToken){varfactorynewConnectionFactory(){HostNamelocalhost,Port5672,Passwordguest,UserNameguest,VirtualHost};varconnectionfactory。CreateConnection();varchannelconnection。CreateModel();1、定义交换机channel。ExchangeDeclare(exchange:smstopic,type:ExchangeType。Topic);2、定义随机队列varqueueNamechannel。QueueDeclare()。QueueN3、队列要和交换机绑定起来号的缺陷:只能匹配一级能够匹配一级及多级以上channel。QueueBind(queueName,smstopic,routingKey:sms。);varconsumernewEventingBasicConsumer(channel);consumer。Received(model,ea){Console。WriteLine(34;model:{model});varbodyea。B1、业务逻辑varmessageEncoding。UTF8。GetString(body。ToArray());Console。WriteLine(〔x〕发送短信{0},message);1、消息是否正常添加到数据库当中,所以需要使用手工确认channel。BasicAck(ea。DeliveryTag,true);};3、消费消息channel。BasicQos(0,1,false);Qos(防止多个消费者,能力不一致,导致的系统质量问题。autoAck设为false不进行自动确认channel。BasicConsume(queue:queueName,autoAck:false,consumer:consumer);}publicTaskStopAsync(CancellationTokencancellationToken){1、关闭rabbitmq的连接thrownewNotImplementedException();}} 4。HeaderExchange(头部交换机) headers类型的Exchange不依赖于routingkey与bindingkey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定Queue与Exchange时指定一组键值对以及xmatch参数,xmatch参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。 1。不需要依赖Key 2。更多的时候,像这种KeyValue的键值,可能会存储在数据库中,那么我们就可以定义一个动态规则来拼装这个Keyvalue,从而达到消息灵活转发到不同的队列中去四、RabbitMQ消息确认 我们根据上面的业务和代码简单实现了由生产者到消费者的一个业务流程,我们可以总结出知道,整个消息的收发过程包含有三个角色,生产者(员工管理网站)、RabbitMQ(Broker)、消费者(微服务),在理想状态下,按照这样实现,整个流程以及系统的稳定性,可能不会发生太大的问题,但是真正在实际应用中我们要去思考可能存在的问题,主要从三个大的方面去分析,然后发散。 1。生产端 2。存储端 3。消费端1。消息生产端 我们在给RabbitMQ发送消息时,如何去保证消息一定到达呢,我们可以使用RabbitMQ提供了2种生产端的消息确认机制 模式 描述 实现方式 Confirm模式 应答模式,生产者发送一条消息之后,Rabbitmq服务器做了个响应,表示消息确认收到 异步模式,在应答之前,可以继续发送消息,单条消息、批量消息 Tx事务模式 基于AMQP协议;可以把channel设置成一个带事务的通道道,分为三步:1。开启事务,提交事务,回滚事务 同步模式,在事务提交之前不能继续发送消息,事务模式效率差一些1。Confirm实现using(varconnectionfactory。CreateConnection()){varchannelconnection。CreateModel();2、定义topic交换机channel。ExchangeDeclare(exchange:smstopic,type:topic);stringcalculateAttendanceDtoJsonJsonConvert。SerializeObject(calculateAttendance);varbodyEncoding。UTF8。GetBytes(calculateAttendanceDtoJson);3、发送消息varpropertieschannel。CreateBasicProperties();properties。P设置消息持久化try{开启消息确认模式channel。ConfirmSelect();channel。BasicPublish(exchange:smstopic,routingKey:sms。CalculateAttendance,basicProperties:properties,body:body);如果一条消息或多消息都确认发送if(channel。WaitForConfirms()){Console。WriteLine(34;【{message}】发送到Broke成功!);}else{可以记录个日志,重试一下;}如果所有消息发送成功就正常执行;如果有消息发送失败;就抛出异常;channel。WaitForConfirmsOrDie();}catch(Exceptionex){Console。WriteLine(34;【{message}】发送到Broker失败!);}}2。Tx事务实现using(varconnectionfactory。CreateConnection()){varchannelconnection。CreateModel();2、定义topic交换机channel。ExchangeDeclare(exchange:smstopic,type:topic);stringcalculateAttendanceDtoJsonJsonConvert。SerializeObject(calculateAttendance);varbodyEncoding。UTF8。GetBytes(calculateAttendanceDtoJson);3、发送消息varpropertieschannel。CreateBasicProperties();properties。P设置消息持久化try{开启事务机制,AMQP协议支持channel。TxSelect();事务是协议支持的channel。BasicPublish(exchange:smstopic,routingKey:sms。CalculateAttendance,basicProperties:properties,body:body);提交事务只有事务提交了才会真正写入队列channel。TxCommit();}catch(Exceptionex){事务回滚channel。TxRollback();}}2。消息存储端 我们生产端给RabbitMQ发送消息成功后,如果RabbitMQ宕机了,会导致RabbitMQ中消息丢失,如何解决消息丢失问题,针对RabbitMQ消息丢失,我们可以在生产者中使用 1。持久化消息 2。集群3。消息消费端1。消费者宕机,导致消息丢失2。执行业务逻辑失败,但是消息已经被消费 当生产者写入消息到RabbitMQ后,消费服务接收消息期间,服务器宕机,导致消息丢失了,这个时候我们就应该使用RabbitMQ的消费端消息确认机制 模式 描述 特点 自动确认autoAck 自动确认,是消费消息的时候,只要收到消息,就直接回执给RabbitMQ,已经收到一切正常;直接总览所有了,如果有1w条消息,只是消费成功了一条消息,RabbitMQ也会认为你是全部成功了,会将所有消息从队列中移除;这样会导致消息的丢失 处理很快 手动确认 消费者消费一条,回执给RabbitMQ一条消息,RabbitMQ只删除当前这一条消息,相当于是一条消费了,删除一条消息; 性能稍微低一些 1。自动确认消息自动确认机制channel。BasicConsume(queue:CreateAttendance,autoAck:true,consumer:consumer); 2。手动确认 消费者收到消息。消费者发送确认消息给rabbitmq期间。执行业务逻辑失败了,但是消息已经确认被消费了,我们应该在我们的消费者接收消息回调执行业务逻辑后面,执行使用手动确认消息机制,保证消息不被丢失varconnectionfactory。CreateConnection();varchannelconnection。CreateModel();channel。ExchangeDeclare(exchange:smstopic,type:ExchangeType。Topic);varqueueNamechannel。QueueDeclare()。QueueNchannel。QueueBind(queueName,smstopic,routingKey:sms。);varconsumernewEventingBasicConsumer(channel);consumer。Received(model,ea){varmessageEncoding。UTF8。GetString(ea。Body。ToArray());执行业务逻辑手工确认告诉borker可以删除消息了channel。BasicAck(ea。DeliveryTag,true);否定:告诉Broker,这个消息我没有正常消费;requeue:true:重新写入到队列里去;false:你还是删除掉;channel。BasicReject(deliveryTag:ea。DeliveryTag,requeue:true);};autoAck设为false不进行自动确认channel。BasicConsume(queue:queueName,autoAck:false,consumer:consumer);3。由于服务器性能不一致导致消息堆积生产者发送高并发消息,消费者来不及处理,导致消息堆积,如何解决消息堆积问题?可以使用消费服务集群,将压力分散到不同的服务实例能解决这个问题,但是又产生了一个新的集群缺陷问题,假设集群服务器的强弱不一致,比较弱的服务器处理消息慢,就会导致大部分消息堆积在这台性能较差的服务器,那又该如何解决呢? 我们可以采用RabbitMQ的QOS功能,俗称限流,他的意思就是消费者一次可以拉取指定数量的消息,在这些消息未处理完毕之前,不会再向队列拉取消息。Qos(防止多个消费者,能力不一致,导致的系统质量问题。每一次一个消费者只成功消费一个)channel。BasicQos(0,1,false);4。如何保证消息不被重复消费(幂等性)1。生产时消息重复由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ 已经接收到了消息。这时候生产者就会重新发送一遍这条消息。生产者中如果消息未被确认,或确 认失败,我们可以使用定时任务(redisdb)来进行消息重试。2。消费时消息重复消费者消费成功后,再给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。 我们可以让每个消息携带一个全局的唯一ID,即可保证消息的幂等性消费者获取到消息后先根据id去查询redisdb是否存在该消息。如果不存在,则正常消费,消费完毕后写入redisdb。 如果存在,则证明消息被消费过,直接丢弃。 原文链接:https:www。cnblogs。comyuxl01p15978229。html 觉得本问对你有帮助的小伙伴,点个赞转发一下吧!