RocketMQ搭建步骤
开发环境
- 64位 centos7(虚拟机,1G内存)
- 64位 jdk1.8
- maven 3.5.0
- Git
- tomcat(用于启动rocketmq-console)
- rocketmq 3.2.6(最好选择maven仓库中已有的版本,保持客户端依赖的jar包和服务器版本一致)
- rocketmq-console
环境变量配置
vi /etc/profile 打开文件配置如下:
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk JRE_HOME=$JAVA_HOME/jre CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib M2_HOME=/usr/maven/ ROCKETMQ_HOME=/usr/rocketmq PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$M2_HOME/bin:$ROCKETMQ_HOME/bin export JAVA_HOME JRE_HOME CLASS_PATH M2_HOME ROCKETMQ_HOME PATH export NAMESRV_ADDR=127.0.0.1:9876
source /etc/profile 使配置文件立即生效
防火墙配置
宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙
service iptables stop 关闭防火墙 service iptables status 查看防火墙的状态 service iptables start 启动防火墙
或者为了安全,只开放特定的端口号,如8080、9876、10911等等,此处不再赘述。
安装、启动RocketMQ
1.下载和安装
cd /usr wget https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz tar -zxvf alibaba-rocketmq-3.2.6.tar.gz mv alibaba-rocketmq-3.2.6 rocketmq
cd rocketmq/bin 进入rocketmq核心命令文件目录
2.设置可执行权限
chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv
3.修改jvm参数
vim修改runserver.sh和runbroker.sh的jvm参数如下(根据虚拟机内存大小设置,超出内存大小可能会报错): JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"
4.启动nameserver
nohup sh mqnamesrv &
5.配置broker
(1)创建broker配置文件
mkdir ../conf/me-2m-2s-async/ sh mqbroker -m >../conf/me-2m-2s-async/broker.p
(2)修改brokerIP
vi ../conf/me-2m-2s-async/broker.p brokerIP1=192.168.x.x 显示指定为虚拟机的外网IP,不要用localhost和127.0.0.1,因为远程主机会根据brokerIP1指定的地址去访问broker
6.启动broker
nohup sh mqbroker -n localhost:9876 -c ../conf/me-2m-2s-async/broker.p &
7.检查nameserver和broker是否启动成功
执行jps,输出以下进程表示启动成功
8464 NamesrvStartup 8618 BrokerStartup
或者,查看nuhup.out日志文件,有如下信息表示启动成功
The Name Server boot success. The broker[localhost.localdomain,192.168.x.x:10911] boot success. and name server is localhost:9876
或者,启动rocketmq自带的Producer和Consumer程序,若可正常发送和消费消息,则表示服务启动成功
bash tools.sh com.alibaba.rocketmq.example.quickstart.Producer #生产者 bash tools.sh com.alibaba.rocketmq.example.quickstart.Consumer #消费者
sh mqshutdown broker sh mqshutdown namesrv
安装、启动rocketmq-console
wget https://github.com/duomu/rocketmq-console/raw/master/rocketmq-console.war 下载 将rocketmq-console.war放在/usr/tomcat/webapps目录下 sh /usr/tomcat/bin/startup.sh 启动tomcat
虚拟机本地访问http://localhost:8080/rocketmq-console,显示如下页面表示启动成功
宿主机远程访问http://192.168.x.x:8080/rocketmq-console,若无法访问,请检查防火墙是否关闭或者是否开放了8080端口号。
编写测试程序
在宿主机(windows)上编写如下测试程序:
依赖配置
//此处只列出mq相关的依赖 <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency>
创建生产者
package com.fuscent.infoquery.practice.rocketmq; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import org.apache.log4j.Logger; /** * @author:duomu * @date:2017/8/4 18:09 */ public class MqProducer { private static Logger logger = Logger.getLogger(MqProducer.class); public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("192.168.229.132:9876"); try { producer.start(); logger.info("producer启动成功"); for (int i = 0; i < 5; i++) { Message msg = new Message("TopicA","tagA","OrderID188","Hello world".getBytes()); SendResult result = producer.send(msg); logger.info("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } } catch (Exception e) { logger.error("发送消息失败,Exception error:" + e); } finally { producer.shutdown(); } } }
创建消费者
package com.fuscent.infoquery.practice.rocketmq; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; import org.apache.log4j.Logger; import java.util.List; /** * @author:duomu * @date:2017/8/4 18:09 */ public class MqConsumer { private static Logger logger = Logger.getLogger(MqConsumer.class); public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer_yll"); consumer.setNamesrvAddr("192.168.229.132:9876"); try { consumer.subscribe("TopicA","tagA||tagB");//可订阅多个tag,但是一个消息只能有一个tag consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) { Message msg = list.get(0); logger.info(msg.toString()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); logger.info("consumer启动成功"); } catch (MQClientException e) { logger.error("消费者订阅消息失败,error:" + e); } } }
测试生成者和消费者
启动生成者
启动消费者
总结
前人栽树,后人乘凉,在baidu+google了n篇文章后,终于把rocketmq搭建成功了,虽然只是单机配置,但是把该踩的坑都踩了,集群搭建应该只是多配几台服务而已,后续再研究啦~~~
坑1
在github上下载了最新的rocketmq4.1.0,后来发现maven中央仓库还没有4.1.0的rocketmq-client依赖包,后来下载了3.5.8,也没有调成功,索性下载一个比较早期的版本,选了3.2.6,我们公司用的3.2.4,比我们公司的早一点点应该不会太差。。。
坑2
nameserver和broker启动成功,宿主机上的生产者发送消息失败,报如下错误,且指向错误码33/44/50:
com.alibaba.rocketmq.client.exception.MQClientException:Send [1] times,still Failed,cost [75]ms,...
出现这个问题首先要查看虚拟机本地的producer是否可以正常发送消息,如果本地收发消息正常,那么一定远程访问的过程中出了问题,可能是端口号没开放,也可能是IP地址映射有问题。
对于端口号,我已经确定了n遍,防火墙是关闭的,最初还没有考虑到IP地址的问题,所以百思不得其解,从阿里官方渠道获取了错误33/44/50的解决方案,试了一下也没用,把rocketmq3.2.6源码里面的Producer跑了一下也是报那个错误,错误44的说明里写着可能是producer没有正确连接到NameServer,我知道没有连接成功,可是防火墙我都关闭了还能有什么原因呢。
捣鼓了大半天,就卡在这个问题上了,我想我一定是漏掉了什么,反反复复看38/44/50的错误说明,直到看到错误50说明里面的这一句话:
然后我注意到下面这个嵌套错误,debug了一下,也没看出什么,当时我还以为这个ip是虚拟机的局域网ip
接着就baidu+google,偶然google出一篇思路别具一格的文章,说rocketmq自动识别网络出错,要把其他网络关掉,我之前学习docker的时候的确在虚拟机上配了docker的网络。
然后就尝试关掉docker的网络(172.17.0.1),可是关掉了还是照样报上面的错误啊。。。
真的没有办法了,今天早晨来了突然想到,能够访问外网ip不能访问局域网ip,ping一下看看吧,果然局域网ip ping不通,由于对网络、虚拟机了解的不深,我就去求教网络童鞋了,问宿主机怎么能够访问虚拟机的局域网ip(我用的NAT模式),网络童鞋说你用桥接模式吧,当时心中暗喜,心想吼吼我的大难题就要这么简单的解决了,网络童鞋走后,我就试了一下,麻蛋为什么用桥接模式分了新的ip(172.16.2.129),还是报上面那个172.17.0.1的错误。。
第一次搭rocketmq,想尽快调通,基本上都是用的默认配置,而且默认配置一般不会有问题啊,自己写配置才容易出错,然鹅万能的百度告诉我我之前先入为主的观念是错的,我想这应该是终极解决方案了吧。。
原来broker自动寻的地址是172.17.0.1,而且深深的刻在了默认配置文件里,虽然我关掉了这个网络,配置文件里还是这个地址,然后我重新写了个配置文件,强制指定broker所在的机器ip为192.168.x.x,重启服务,大功告成!
和局域网ip能否ping通无关,我把网络连接改回了NAT模式,感谢网络童鞋的帮忙,我要好好补一下网络和虚拟机的知识了。。。
参考资料
附上最有价值的几个~~
http://rocketmq.apache.org/docs/quick-start/ 官方资料,搭建mq之前最好把User Guide都看一遍
https://firsh.me/2017/07/19/rocketmq-p-c/
http://www.jb51.cc/article/p-umqhlrep-bov.html 坑2的终极解决方案
http://www.cnblogs.com/badboyf/p/6611774.html