1、RabbitMQ介绍
1.1、什么是RabbitMQ?
RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。
1.2、什么是AMQP?
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。它从生产者接收消息并递送给消费者,在这个过程中,根据规则进行路由,缓存与持久化。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
而在AMQP中主要有两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变动),如下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型:
1.3、RabbitMQ的基础概念
- Broker:简单来说就是消息队列服务器实体
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
- producer:消息生产者,就是投递消息的程序
- consumer:消息消费者,就是接受消息的程序
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
1.4、RabbitMQ的特性
- 可靠性:包括消息持久化,消费者和生产者的消息确认
- 灵活路由:遵循AMQP协议,支持多种Exchange类型实现不同路由策略
- 分布式:集群的支持,包括本地网络与远程网络
- 高可用性:支持主从备份与镜像队列
- 多语言支持:支持多语言的客户端
- WEB界面管理:可以管理用户权限,exhange,queue,binding,与实时监控
- 访问控制:基于vhosts实现访问控制
- 调试追踪:支持tracing,方便调试
2、RabbitMQ的官网在哪里?
http://www.rabbitmq.com/
3、RabbitMQ在哪里下载?
http://www.rabbitmq.com/download.html
4、如何安装RabbitMQ
4.1、通过安装RabbitMQ的源来安装
在Ubuntu上安装RabbitMQ非常简单
lion@ubuntu1404:~$ sudo echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etcaptsources.listdrabbitmqlist$ wget -O https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -$ sudo apt-get update install rabbitmqserver
其他系统安装方法:http://www.rabbitmq.com/download.html
4.2、通过源码安装
本文中的实例,主要通过源码安装来演示。
4.2.1、安装Erlang@H_301_247@
相关安装文档:http://erlang.org/erldoc
lion@node1 install y erlangnox erlangdev erlangsrc
4.2.2、Rabbitmq 3.6.3安装@H_301_247@
相关安装文档:http://www.rabbitmq.com/install-generic-unix.html。
我们先下载源码并解压
$ mkdir p _app:~/_app$ wget http:/wwwcomreleasesserverv3.6.3genericunix-3.63.tarxz:~/_app$ xz d rabbitmq_app$ tar xvf rabbitmq3.tar_app$ cd rabbitmq_server3
设置环境变量$RABBITMQ_HOME
$ vi bashrc
在.bashrc中添加以下内容
export RABBITMQ_HOME="/home/lion/_app/rabbitmq_server-3.6.3" PATH"$RABBITMQ_HOME/sbin:$PATH"
让环境变量生效
$ source 启动Rabbitmq
$ rabbitmq
安装以后可以通过下面的命令,停止、启动:
$ rabbitmqctl stop
$ rabbitmqctl start
4.3、开启web管理插件
创建一个用户lion,并设置密码123456:
$ rabbitmqctl add_user lion 123456
可以通过下面的命令,查看现有的用户更表
$ rabbitmqctl list_usersListing users ...guest [administrator]lion []
这个时候lion用户是不能访问web管理插件的,需要配置用户角色,用户角色可分为五类,超级管理员,监控者,策略制定者,普通管理者以及其他。
-
超级管理员(administrator)
可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
-
监控者(monitoring)
可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
-
策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下),同时可以对policy进行管理。但无法查看节点的相关信息。
-
普通管理者(management)
仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
-
其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
通过下面的命令,可以将lion添加到administrator用户组:
$ rabbitmqctl set_user_tags lion administrator
然后可以用下面的命令来启用/信上管理插件:
plugins enable rabbitmq_management (启用插件)plugins disable rabbitmq_management (禁用插件)
通过浏览访问http://127.0.0.1:15672/
输入用户名lion,密码123456就可以看到后台了。
@H_
404_527@
rabbitmqctl的更多命令参考:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
4.4、RabbitMQ 的配置文件介绍
RabbitMQ的配置文件目录默认是$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf,如果文件不存在,可以自己创建。
配置文件全部说明地址:http://www.rabbitmq.com/configure.html#configuration-file
%%-*- mode: erlang -*----------------------------------------------------------------------------- RabbitMQSampleConfigurationFile.%%See http//www.rabbitmq.com/configure.html for details.----------------------------------------------------------------------------[{rabbit,[%%NetworkConnectivity====================%%By default will listen on all interfacesusing the standard (reserved) AMQP port.默认的监听端口tcp_listeners[5672]},128)">To listen on a specific interface provide a tuple of {IpAddressPort}.For example to listen only on localhost for both IPv4andIPv6:也可以使用下面的格式进行指定IP和端口的监听[{"127.0.0.1" },161)">{"::1"}]},76)"> SSL listeners are configured in the same fashion as TCP listeners including the option to control the choice of SSL连接端口配置ssl_listeners5671Number of Erlang processes that will accept connections the TCP SSL listeners TCP连接的进程数num_tcp_acceptors10num_ssl_acceptors1Maximum time AMQP 08/91 handshake after socket connection SSL handshake),76)"> milliseconds超时时间,单位毫秒handshake_timeout10000Log levels currently just used connection logging).One of 'debug' 'info''warning''error'or'none' decreasing order of verbosityDefaults to 日志的级别,默认是infolog_levels[{connection infochannelSet'true' to perform reverse DNS lookups when accepting a connectionHostnames will then be shown instead of IP addresses rabbitmqctl the management pluginreverse_dns_lookupstrueSecurity AAA==============安全配置Thedefault"guest" user is only permitted to access the server via a loopback interfaceeg localhostloopback_users[<<"guest">>]},76)"> Uncomment the following line if you want to allow access to the guest user from anywhere on the network[]},128)">Configuring//www.rabbitmq.com/ssl.html for full documentation.ssl_optionscacertfile"/path/to/testca/cacert.pem"certfile"/path/to/server/cert.pem"keyfile"/path/to/server/key.pem"verify verify_peerfail_if_no_peer_certfalseChoose the available SASL mechanisms to expose two built in mechanisms are 'PLAIN'and'AMQPLAIN'Additional mechanisms can be added via plugins//www.rabbitmq.com/authentication.html for more details.auth_mechanisms['PLAIN'Select an authentication database to use comes bundledwith a built authdatabase based on mnesiaauth_backendsrabbit_auth_backend_internalConfigurations supporting the rabbitmq_auth_mechanism_ssl rabbitmq_auth_backend_ldap plugins NBThese options require that the relevant plugin enabled//www.rabbitmq.com/plugins.html for further details.RabbitMQauthmechanismssl plugin makes it possible to authenticate a user based on the client's SSL certificate. %% %% To use auth-mechanism-ssl,add to or replace the auth_mechanisms %% list with the entry 'EXTERNAL'. %% %% {auth_mechanisms,['']}, %% The rabbitmq_auth_backend_ldap plugin allows the broker to %% perform authentication and authorisation by deferring to an %% external LDAP server. %% For more information about configuring the LDAP backend,see %% http://www.rabbitmq.com/ldap.html. %% Enable the LDAP auth backend by adding to or replacing the %% auth_backends entry: %% {auth_backends,[rabbit_auth_backend_ldap]},68)"> %% This pertains to both the rabbitmq_auth_mechanism_ssl plugin and %% STOMP ssl_cert_login configurations. See the rabbitmq_stomp %% configuration section later in this file and the README in %% https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl for further %% details. %% To use the SSL cert's CN instead of its DN the usernamessl_cert_login_from common_name SSL handshake timeoutssl_handshake_timeout5000Password hashing implementationWill only affect newly created users recalculate hash an existing user it's necessary to update her password. %% {password_hashing_module,rabbit_password_hashing_sha256},68)"> %% Default User / VHost %% ==================== %% 用户访问设置 %% On first start RabbitMQ will create a vhost and a user. These %% config items control what gets created. See %% http://www.rabbitmq.com/access-control.html for further %% information about vhosts and access control. %% {default_vhost,<<"/">>},68)"> %% {default_user,<<"guest">>},68)"> %% {default_pass,68)"> %% {default_permissions,[<<".*">>,<<".*">>,<<".*">>]},68)"> %% Tags for default user %% For more details about tags,see the documentation for the %% Management Plugin at http://www.rabbitmq.com/management.html. %% {default_user_tags,[administrator]},68)"> %% Additional network and protocol related configuration %% ===================================================== %% Set the default AMQP heartbeat delay (in seconds). %% 设置默认AMQP心跳延迟(秒) %% {heartbeat,600},68)"> %% Set the max permissible size of an AMQP frame (in bytes). %% {frame_max,131072},68)"> %% Set the max frame size the server will accept before connection %% tuning occurs %% {initial_frame_max,4096},68)"> %% Set the max permissible number of channels per connection. %% 0 means "no limit". %% {channel_max,128},68)"> %% Customising Socket Options. %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for %% further documentation. %% {tcp_listen_options,[{backlog,68)"> %% {nodelay,true},68)"> %% {exit_on_close,false}]},68)"> %% Resource Limits & Flow Control %% ============================== %% See http://www.rabbitmq.com/memory.html for full details. %% Memory-based Flow Control threshold. %% {vm_memory_high_watermark,0.4},68)"> %% Alternatively,we can set a limit (in bytes) of RAM used by the node. %% Or you can set absolute value using memory units. %% Supported units suffixes: %% k,kiB: kibibytes (2^10 bytes) %% M,MiB: mebibytes (2^20) %% G,GiB: gibibytes (2^30) %% kB: kilobytes (10^3) %% MB: megabytes (10^6) %% GB: gigabytes (10^9) %% Fraction of the high watermark limit at which queues start to %% page message out to disc in order to free up memory. %% Values greater than 0.9 can be dangerous and should be used carefully. %% 内存最大使用比例 %% {vm_memory_high_watermark_paging_ratio,0.5},68)"> %% Interval (in milliseconds) at which we perform the check of the memory %% levels against the watermarks. %% 检查内存的间隔(毫秒) %% {memory_monitor_interval,2500},68)"> %% Set disk free limit (in bytes). Once free disk space reaches this %% lower bound,a disk alarm will be set - see the documentation %% listed above for more details. %% {disk_free_limit,50000000},68)"> %% Or you can set it using memory units (same as in vm_memory_high_watermark) %% Values lower than 1.0 can be dangerous and should be used carefully. %% Misc/Advanced Options %% ===================== %% NB: Change these only if you understand what you are doing! %% To announce custom properties to clients on connection: %% {server_properties,[]},68)"> %% How to respond to cluster partitions. %% See http://www.rabbitmq.com/partitions.html for further details. %% {cluster_partition_handling,ignore},68)"> %% Make clustering happen *automatically* at startup - only applied %% to nodes that have just been reset or started for the first time. %% See http://www.rabbitmq.com/clustering.html#auto-config for %% further details. %% 设置集群启动的节点 %% {cluster_nodes,{['rabbit@myhostcom'],disc}},68)"> %% Interval (in milliseconds) at which we send keepalive messages %% to other cluster members. Note that this is not the same thing %% as net_ticktime; missed keepalive messages will not cause nodes %% to be considered down. %% 集群消息同步的时间(毫秒) %% {cluster_keepalive_interval,10000},68)"> %% Set (internal) statistics collection granularity. %% {collect_statistics,none},68)"> %% Statistics collection interval (in milliseconds). %% {collect_statistics_interval,5000},68)"> %% Explicitly enable/disable hipe compilation. %% {hipe_compile,68)"> %% Timeout used when waiting for Mnesia tables in a cluster to %% become available. %% {mnesia_table_loading_timeout,30000},68)"> %% Size in bytes below which to embed messages in the queue index. See %% http://www.rabbitmq.com/persistence-conf.html %% {queue_index_embed_msgs_below,4096} ]},68)"> %% ---------------------------------------------------------------------------- %% Advanced Erlang Networking/Clustering Options. %% See http://www.rabbitmq.com/clustering.html for details {kernel,68)"> [%% Sets the net_kernel tick time. %% Please see http://erlang.org/doc/man/kernel_app.html and %% http://www.rabbitmq.com/nettick.html for further details. %% {net_ticktime,60} %% RabbitMQ Management Plugin %% See http://www.rabbitmq.com/management.html for details {rabbitmq_management,68)"> [%% Pre-Load schema definitions from the following JSON file. See %% http://www.rabbitmq.com/management.html#load-definitions %% {load_definitions,"/path/to/schema.json"},68)"> %% Log all requests to the management HTTP API to a file. %% 所有请求的HTTP API文件日志的路径。 %% {http_log_dir,"/path/to/access.log"},68)"> %% Change the port on which the HTTP listener listens,68)"> %% specifying an interface for the web server to bind to. %% Also set the listener to use SSL and provide SSL options. %% Web管理的地址和端口 %% {listener,[{port,12345},68)"> %% {ip,"127.0.0.1"},68)"> %% {ssl,68)"> %% {ssl_opts,[{cacertfile,"/path/to/cacert.pem"},68)"> %% {certfile,"/path/to/cert.pem"},68)"> %% {keyfile,"/path/to/key.pem"}]}]}, %% One of 'basic','detailed' or 'none'. See %% http://www.rabbitmq.com/management.html#fine-stats for more details. %% {rates_mode,basic},68)"> %% Configure how long aggregated data (such as message rates and queue %% lengths) is retained. Please read the plugin's documentation in//www.rabbitmq.com/management.html#configuration for more detailssample_retention_policies[{global[{605{3600864001200basicdetailed}]}]}----------------------------------------------------------------------------ShovelPlugin//www.rabbitmq.com/shovel.html for detailsrabbitmq_shovelshovels[%% A named shovel workermy_first_shovel[List the source broker which to consume URI predeclarations all source brokerbrokers"amqp://user:password@host.domain/my_vhost"declarations[]} the destination broker to publish todestinations A singular version of the 'brokers' elementbroker"amqp://"Name of the queue to shovel messages fromqueue<<"your-queue-name-goes-here">>},128)">Optional prefetch countprefetch_count to acknowledge messages no_ack never (auto) on_publish after each message republished on_confirm the destination broker confirms receiptack_modeOverwrite fields of the outbound basicpublishpublish_fieldsexchange"my_exchange"routing_key"from_shovel">>}]},128)">Static list of basicproperties to set on republicationpublish_propertiesdelivery_mode2 number of seconds to wait before attempting to reconnect the event of a connection failurereconnect_delay2.5}]}End of my_first_shovel]}Rather than specifying some values pershovel you can specify them all shovels heredefaults}]}StompAdapter//www.rabbitmq.com/stomp.html for detailsrabbitmq_stomp the format generally the same the brokerListen only on localhost ipv4 & ipv6 on a specific port61613 SSL connections on a specific port61614 SSL optionsExtract a name the client's certificate when using SSL. %% {ssl_cert_login,68)"> %% Set a default user name and password. This is used as the default login %% whenever a CONNECT frame omits the login and passcode headers. %% Please note that setting this will allow clients to connect without %% authenticating! %% {passcode,"guest"}]},68)"> %% If a default user is configured,or you have configured use SSL client %% certificate based authentication,you can choose to allow clients to %% omit the CONNECT frame entirely. If set to true,the client is %% automatically connected as the default user or user supplied in the %% SSL certificate whenever the first frame sent on a session is not a %% CONNECT frame. %% {implicit_connect,true} %% RabbitMQ MQTT Adapter %% See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md %% for details {rabbitmq_mqtt,68)"> [%% Set the default user name and password. Will be used as the default login %% if a connecting client provides no other login details. %% Enable anonymous access. If this is set to false,clients MUST provide %% login information in order to connect. See the default_user/default_pass %% configuration elements for managing logins without authentication. %% {allow_anonymous,68)"> %% If you have multiple chosts,specify the one to which the %% adapter connects. %% {vhost,68)"> %% Specify the exchange to which messages from MQTT clients are published. %% {exchange,<<"amq.topic">>},68)"> %% Specify TTL (time to live) to control the lifetime of non-clean sessions. %% {subscription_ttl,1800000},68)"> %% Set the prefetch count (governing the maximum number of unacknowledged %% messages that will be delivered). %% {prefetch,10},68)"> %% TCP/SSL Configuration (as per the broker configuration). %% {tcp_listeners,[1883]},68)"> %% {ssl_listeners,68)"> %% Number of Erlang processes that will accept connections for the TCP %% and SSL listeners. %% {num_tcp_acceptors,68)"> %% {num_ssl_acceptors,1},68)"> %% TCP/Socket options (as per the broker configuration). %% RabbitMQ AMQP 1.0 Support %% See https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md {rabbitmq_amqp1_0,68)"> [%% Connections that are not authenticated with SASL will connect as this %% account. See the README for more information. %% Enable protocol strict mode. See the README for more information. %% {protocol_strict_mode,false} %% RabbitMQ LDAP Plugin %% See http://www.rabbitmq.com/ldap.html for details. {rabbitmq_auth_backend_ldap,68)"> [%% %% Connecting to the LDAP server(s) %% ================================ %% Specify servers to bind to. You *must* set this in order for the plugin %% to work properly. %% {servers,["your-server-name-goes-here"]},68)"> %% Connect to the LDAP server using SSL %% {use_ssl,false},68)"> %% Specify the LDAP port to connect to %% {port,389},68)"> %% LDAP connection timeout,in milliseconds or 'infinity' %% {timeout,infinity},68)"> %% Enable logging of LDAP queries. %% One of %% - false (no logging is performed) %% - true (verbose logging of the logic used by the plugin) %% - network (as true,but additionally logs LDAP network traffic) %% Defaults to false. %% {log,68)"> %% Authentication %% ============== %% Pattern to convert the username given through AMQP to a DN before %% binding %% {user_dn_pattern,"cn=${username},ou=People,dc=example,dc=com"},you can convert a username to a Distinguished %% Name via an LDAP lookup after binding. See the documentation for %% full details. %% When converting a username to a dn via a lookup,set these to %% the name of the attribute that represents the user name,and the %% base DN for the lookup query. %% {dn_lookup_attribute,"userPrincipalName"},68)"> %% {dn_lookup_base,"DC=gopivotal,DC=com"},68)"> %% Controls how to bind for authorisation queries and also to %% retrieve the details of users logging in without presenting a %% password (e.g.,SASL EXTERNAL). %% - as_user (to bind as the authenticated user - requires a password) %% - anon (to bind anonymously) %% - {UserDN,Password} (to bind with a specified user name and password) %% Defaults to 'as_user %% {other_bind,as_user},68)"> %% Authorisation %% ============= %% The LDAP plugin can perform a variety of queries against your %% LDAP server to determine questions of authorisation. See %% http://www.rabbitmq.com/ldap.html#authorisation for more %% information. %% Set the query to use when determining vhost access %% {vhost_access_query,{in_group,68)"> %% "ou=${vhost}-users,ou=vhosts,dc=com"}},68)"> %% Set the query to use when determining resource (e.g.,queue) access %% {resource_access_query,{constant,true}},68)"> %% Set queries to determine which tags a user has %% {tag_queries,[]} ]}].
5、Golang调用RabbitMQ的案例
下载Golgang运行amqp协议的包,在Rabbitmq官网上有提供现在的golang包来使用amqp协议与Rabbitmq交互 。
我们先将包下载到本地,然后就可以直接使用了:
$ go
githubstreadwayamqp
5.1、使用Golang来发送第一个hello idoall.org
在第一个教程中,我们写程序从一个命名的队列(test-idoall-queues)中发送和接收消息。
producer_hello.go(消息生产者):
package
mainimport("fmt""log""github.com/streadway/amqp")const //AMQP URI uri ="amqp://guest:guest@localhost:5672/"//Durable AMQP exchange name exchangeName ""//Durable AMQP queue name queueName "test-idoall-queues"//Body of message bodyMsg string"hello idoall.org")//如果存在错误,则输出func failOnErrorerr error msg string{ err !=nil log.Fatalf("%s: %s" msg err panicfmtSprintf))}}func main(){//调用发布消息函数 publishuri exchangeName queueName bodyMsgPrintf"published %dB OK" lenbodyMsg))}//发布者的方法////@amqpURI,amqp的地址//@exchange,exchange的名称//@queue,queue的名称//@body,主体内容func publishamqpURI exchange queue body ){//建立连接"dialing %q" amqpURI:= amqpDialamqpURI failOnErrorerr"Failed to connect to RabbitMQ" defer connectionClose()//创建一个Channel"got Connection,getting Channel" channelChannel"Failed to open a channel" defer channel"got queue,declaring %q" queue//创建一个queue qQueueDeclare// name// durable// delete when unused// exclusive// no-waitnil// arguments"Failed to declare a queue""declared queue,publishing %dB body (%q)"body body// Producer只能发送到exchange,它是不能直接发送到queue的。// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。// routing_key就是指定的queue名字。Publish exchange// exchangeName// routing key// mandatory// immediatePublishingHeadersTable{},128)">ContentType"text/plain"ContentEncoding""Body[]byte})"Failed to publish a message")}
consumer_hello(消息消费者).go
//Durable AMQP exchange nam
"test-idoall-queues"//调用消息接收者 consumer//接收者方法名称func consumer"Queue bound to Exchange,starting Consume"//订阅消息 msgsConsume// queue// consumer// auto-ack// no-local// args"Failed to register a consumer"//创建一个channel forever makechan bool//调用gorountine go func() d range msgs "Received a message: %s" d}()" [*] Waiting for messages. To exit press CTRL+C"//没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出<-forever Console1(运行producer):
/_code/
_rabbitmq_golang$ go run producer_hellogo2016072302:2951 dialing "amqp://guest:guest@localhost:5672/" got Connection getting Channel got queue declaring "test-idoall-queues" declared queue publishing 16B body "hello idoall.org") published OK
然后运行以下命令,可以看到我们刚才创建的queues在列表中
_golang$ rabbitmqctl list_queues
queues testidoallqueues 1
Console2(运行consumer)打印消息到屏幕,可以看到刚才我们通过producer发送的消息hello idoall.org
_golang$ go run consumer_hello
033314Queue bound to Exchange starting Consume[*]Waiting messagesexit press CTRL+CReceived a message hello idoallorg
5.2、Rabbitmq的任务分发机制
在5.1章节中,我们写程序从一个命名的队列中发送和接收消息。在这个章节中,我们将创建一个工作队列,将用于分配在多个工人之间的耗时的任务。
RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果任务队伍过多,那么只需要创建更多的Consumer来进行任务处理即可。
producer_task.go(消息生产者):
"os"
"strings""test-idoall-queues-task" bodyMsg bodyFromosArgsfunc bodyFromargs var s stringlenargs<|| os]=="" s "hello idoall.org"}else stringsJoin:],68)">" "return s consumer_task(消息消费者).go
"bytes"
"time" dot_count bytesCount"." t timeDurationdot_countSleept *Second"Done" 查看结果
Console1(consumer):
_golang$ go run consumer_task
1140"test-idoall-queues-task"C
Console2(consumer):
这个时候我们使用Producer 来 Publish Message:
_golang$ go run producer_task
go First message&& go run producer_taskSecond..Third...Fourth....Fifth.....171314B"First message." OK"Second message.."15"Third message..."1618B"Fourth message....""Fifth message....." 这时我们再看刚才打开的两个Consumer的结果:
Console1(consumer):
21
.Done...18Done
..
....20 默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin,也叫消息轮询
5.3、Message acknowledgment 消息确认
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们的代码,一旦RabbitMQ Server发送给Consumer消息后,会立即把这个Message标记为完成,然后从queue中删除。我们将无法再操作这个尚未处理完成的消息。
实际场景中,如果一个Consumer异常退出了,我们希望它处理的数据能够被另外的Consumer处理,这样数据在这种情况下(通道关闭、连接关闭、TCP连接丢失等情况)就不会丢失了。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,ack(nowledgments)是从Consumer消费后发送到一个特定的消息告诉RabbitMQ已经收到、处理结束,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message重新排进队列,发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。
这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。
消息确认默认是关闭的,我们需要通过,d.ACK(false)来告诉RabbitMQ我们已经完成任务。
producer_acknowledgments(消息生产者).go:
"test-idoall-queues-acknowledgments"
consumer_acknowledgments(消息消费者).go
Ack
查看结果
我们先使用Producer来发送一列消息:
_golang$ go run producer_acknowledgments
go run producer_acknowledgments2141"test-idoall-queues-acknowledgments"414243 通过rabbitmqctl命令,来看下messages_unacknowledged的情况:
_golang$ rabbitmqctl list_queues name messages_ready messages_unacknowledged
queuestask 00acknowledgments 50
使用Consumer来订阅消息操作到第三条的时候,我们按CTRL+C退出,这个时候相当于消息已经被读取,但是未发送d.ACK(false):
_golang$ go run consumer_acknowledgments
56353638...^Csignal interrupt
再通过rabbitmqctl命令可以看到,还是有3条消息未处理
3
0
5.4、Message durability消息持久化
如果服务器死机或程序 crash了,数据仍然会丢失。为了确保消息不会丢失,我们需要将queue和Message做持久化操作。
将durable设置为true可以做持久化处理(生产者和消息者的代码里都要设置),如果是已经存在的一个queue 没有设置过持久化,再重新设置是不起作用的,我们需要重新为queue设置一个名字。
最后在Producer发布消息的时候,我们需要设置DeliveryMode为amqp.Persistent,持久化的工作就做完了,下面我们来看代码
producer_durability.go(消息生产者):
"test-idoall-queues-durability"
DeliveryModePersistent consumer_durability.go(消息接收者):
_rabbitmq