.net +RabbitMQ消息队里的安装配置与示例代码

-- RabbitMQ消息队列的安装配置
【官网】:http://www.rabbitmq.com

应用场景

有时开发的大型系统中,涉及如下场景时,可使用消息队列(本例中以RabbitMQ为例): 1.)通过异步,提高响应时间.//实现异步处理. 2.)业务解耦.//一段代码中(业务方法)不包含大量的不同业务的具体细节. 3.)流量削峰.//大流量的时候先把请求放在消息队列中,太大则舍弃一部分.

基础资源

windows10,otp_win64_20.0.exe(rabbitmq运行环境) ,rabbitmq-server-3.6.11.exe(rabbitmq服务的安装程序),

使用须知

1.)这是一个免费开源的消息队列组件...除了服务程序,各语言的sdk,还自带监控运维程序以及api. 2.)由于消息队列往往起着一个很重要的角色,因此请熟读相关说明再实施,避免操作失误带来一些数据没有持久化而丢失的问题等.

配置步骤

>windows上的rabbitmq安装与配置.

>>RabbitMQ Server的运行环境及服务程序下载与安装.

运行环境:http://www.erlang.org/downloads. //本例(otp_win64_20.0.exe).

服务程序:http://www.rabbitmq.com/install-windows.html //下载后,安装的时间启动过程非常慢(本例:rabbitmq-server-3.6.11.exe.

>>设置windows系统环境变量.

方案1:输入命令:Setx  ERLANG_HOME “D:Program Fileserl9.0″

方案2手动通过【我的电脑】->【属性】->【高级系统设置】->【环境变量】->【系统变量】中增加键值对: ERLANG_HOME   D:Program Fileserl9.0



>>启动服务(示例).

d:

cd "D:Program FilesRabbitMQ Server abbitmq_server-3.6.11sbin"

rabbitmq-service install

rabbitmq-service enable

rabbitmq-service start

[ps]如果有的时候cmd窗口中显示没有配置环境变量,而实际上已经配置了,只需要把cmd窗口关闭重开就好了.

>>确认rabbitmq服务已安装并启动成功.

[1]如果显示”unable to connect to node `rabbit@机器名’  nodown, 就进行如下操作:显示node没有连接上,需要到C:Windows目录下,将.erlang.cookie文件,拷贝到用户目录下 C:Users{用户名},这是ErlangCookie文件,允许与Erlang进行交互,现在重复运行刚才的命令就会得到如下信息:

[2]实践发现,有时用管理员权限运行上述rabbitmqctl 反而说环境变量设置错误之类的,普通用户反而可以.

>>管理vhost用户权限.

<创建vhost>

rabbitmqctl add_vhost vhostname1

[root/root]

[ps]一般在线上,需要在确保已添加了自己的账号之后,将默认的账号(administrator  guest)删除掉.


<列出所有用户>

rabbitmqctl list_users

<创建用户>

rabbitmqctl  add_user  username1  pwd1

<设置用户对所有vhost的权限>

rabbitmqctl  set_permissions  username1  ".*"  ".*"  ".*"//授予对所有消息队列的配置、

<设置用户对指定vhost权限>

set_permissions [-p]//授予对指定vhost的读写配置权限.

<设置权限及用户组示例>

rabbitmqctl set_permissions -p / username1".*" ".*" ".*" //其中的/就是默认vhost的名称.

rabbitmqctl  set_user_tags username1 administrator //设置为管理员组

<角色枚举值>

//http://blog.csdn.net/super_rd/article/details/71191851?utm_source=itdadao&utm_medium=referral

management :(维护人员:可管理插件)User can access the management plugin
policymaker :(维护及设置人员:可管理插件以及vhosts的变量及设置)User can access the management plugin and manage policies and parameters for the vhosts they have access to.
monitoring :(运维监控人员:访问插件并监控所有connections,channles信息)User can access the management plugin and see all connections and channels as well as node-related information.
administrator :(超级管理员:管理用户,vhost,权限,插件等)User can do everything monitoring can do, manage users, vhosts and permissions, close other users connections, and manage policies and parameters for all vhosts.


<删除用户>rabbitmqctl delete_user username1

<修改密码>rabbitmqctl change_password  username1  newpassowrd


<完整示例1>

rabbitmqctl  add_user  root  root//用户名,密码都是root

rabbitmqctl set_permissions -p / root  ".*" ".*" ".*"//root用户所有vhost的所有权限

rabbitmqctl  set_user_tags root administrator //更换root用户的组为超级管理员组

rabbitmqctl delete_user guest //删除访客组

rabbitmqctl add_vhost pcw_vhost  //创建vhost:pcw_vhostvhost名称,类似数据库名.

rabbitmqctl set_permissions -p pcw_vhost root ".*" ".*" ".*  //root这个用户授予pcw_vhost这个vhost的所有权限.


常见问题

快速入门

>c#.net使用rabbitmq消息队列.

>>在应用程序应用RabbitMQ.

>>>关于转发器Exchange,Routingkey,bindQueue的相关介绍.

RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储

RabbitMQ提供了四种Exchangefanout,direct,topic,header。但常用的主要是fanout,direct,topic

性能排序:fanout > direct >> topic。比例大约为11106


任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

1.可以理解为路由表的模式

2.这种模式不需要RouteKey

3.这种模式需要提前将ExchangeQueue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。

4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue

1.一般情况可以使用rabbitMQ自带的Exchange”"(Exchange的名字为空字符串,下文称其为default Exchange)

2.这种模式下不需要将Exchange进行任何绑定(binding)操作

3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。

4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个标题”(RouteKey)Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。

2.这种模式需要RouteKey,也许要提前绑定ExchangeQueue

3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey”MQ.log.error”的消息会被转发到该队列)

4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。



>>RabbitMQ运维.

>>>监听消费者是否消费并确认数据.

>>>无法正常的存取消息.

[可能1]rabbitmq windows服务是开启的,但是在cmd窗口运行”rabbitmqctl  status” 没有显示出节点状态信息:

方案1:直接先运行消息生产者,再运行消息消费者.//因为如果没有持久化,队列其实是不存在的.

方案2:需要重启该服务.

方案3:将交换器,队列,消息标识为持久化.

>>>确认已经调用了BasicAck但是还是无限重复入队.

根据实践,发现业务中,由于是在一个解决方案中,消息生产者和消息消费者重用了一个方法,这个方法里面包含了发送消息队列(生产消息)的逻辑,同时本身有没有严格的判断,所以就无限循环了.

>>>打开Connection异常.

Durable参数不可变更.

>>>需要发多次才会收到消息.

可以检查下消息消费者这段是否开启了多个实例. 开启了多个实例之后会自动分发.

>>>关于设置是否需要回复(ack).

[不需要应答]

BasicGetResult res = channel.BasicGet(queueName, true/*noAck*/);//如果noAcktrue,则不需要回复,也就是BasicAck就不需要,否则就需要.

if (res != null)

{

sMessage = Encoding.UTF8.GetString(res.Body);

this.UpdateLsbox(string.Format("direct_withack:{0}", sMessage));

channel.BasicAck(res.DeliveryTag, false);//应答,告诉RabbitMQ这条消息已经处理完毕,可以释放掉了.  //如果在不需要应答调用该句时会出错.

}

[需要应答]

BasicGetResult res = channel.BasicGet(queueName, false/*noAck*/);//如果noAcktrue,则不需要回复,也就是BasicAck就不需要,否则就需要.

if (res != null)

{

sMessage = Encoding.UTF8.GetString(res.Body);

this.UpdateLsbox(string.Format("direct_withack:{0}", sMessage));

channel.BasicAck(res.DeliveryTag, false);//应答,告诉RabbitMQ这条消息已经处理完毕,可以释放掉了.

}


>>>消息持久化.

A.)需要注意的是,将消息设置为持久化并不能完全保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,但是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要消息队列持久化的强保证,可以使用publisher confirms

要注意,如果这个操作过程中有接收端处于连接状态它们不会自动断开,但也不会再收到消息,需要手动重新连接一下.


B.)通过设置ExchangeQueuedurable属性为true,可以使得队列和Exchange持久化,
但是这还不能使得队列中的消息持久化,还需要生产者在发送消息的时候,
basicPublish()方法的BasicProperties props参数中deliveryMode设置为2【使用basicPublish(MessageProperties.PERSISTENT_BASIC)】,
只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。如图所示:


这里需要注意的是,只有durabletrueExchangedurabletureQueues才能绑定,
否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。

C.)如果之前的声明是不持久化的,后来声明为持久化,这是不允许的,除非之前的非持久化队列能被重启服务删除掉.

D.)涉及持久化的服务,切记需要在发布的时候发布下消息的生产者端,有时一糊涂发布消费者端,就没有作用了.

E.)如何校验持久化生效了,就是通过重启rabbitmq服务,看之前未被消费者消费的消息是否依然存在.

F.)比如说你初始化了一个队列msgs.你会发现它真的持久了!每次服务器端重启后,通过list_queues命令查看的时候都存在.但是时间久了,这个msgs我们并不需要了,怎么办呢? 其实想清除这个队列只能删除它所在的vhost,然后再重建vhost,再设置vhost的权限.

rabbitmqctl delete_vhost   vhostName

rabbitmqctl add_vhost  vhostName

rabbitmqctl set_permissions -p  vhostName username1 ‘.*‘ ‘.*‘ ‘.*‘


>>>队列数太多.

实践发现可能是在Direct Exchange模式下:queuename这些参数没有指定..或者指定的是空字符串..

>>>可视化界面.

rabbitmq自带管理后台,安装后需要配置开启

进入rabbitmq安装目录中的sbin目录执行

rabbitmq-plugins enable rabbitmq_management  //发现有一次在服务器上,复制该命令过去就提示”could not recognise command ”,手输入就可以了.

重启rabbitmq服务生效

打开http://localhost:15672/即可看到管理后台

用户名密码均为guest

参考资料