一、部署前的说明
本文根据官网资料搭建,官方网站部署文档地址:
本文采用3台服务器,多节点(集群)多副本模式-异步复制,具体看表格:
二、安装部署
1、安装JDK
JDK安装就不用多说了
2、安装rocketmq
官网下载地址:
3、修改rocketmq的jvm内存大小(按需修改)
cd /usr/local/rocketmq/bin
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
4、启动nameserver
第一台
nohup ./mqnamesrv -n 120.120.181.204:9876 &
第二台
nohup ./mqnamesrv -n 120.120.181.208:9876 &
第三台
nohup ./mqnamesrv -n 120.120.181.209:9876 &
5、配置borker
服务器2
broker-a.properties
# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=120.120.181.208
# broker的id,0表示master,>0表示slave
brokerId=0
# 删除文件时间点,默认在凌晨4点
deleteWhen=04
# 文件保留时间为48小时
fileReservedTime=48
# broker的角色为master
brokerRole=ASYNC_MASTER
#使用异步刷盘的方式
flushDiskType=ASYNC_FLUSH
#名称服务器的地址列表
namesrvAddr=120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
# 在发送消息自动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端口
listenPort=10911
# abort文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog存储路径炸
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# 限制的消息大小
maxMessageSize=65536
# commitLog每个文件的大小默认1G次
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
服务器2
broker-b-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerIP1=120.120.181.208
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
defaultTopicQueueNums=4
autocreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
abortFile=/usr/local/rocketmq/store-slave/abort
storePathRootDir=/usr/local/rocketmq/store-slave
storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
storePathIndex=/usr/local/rocketmq/store-slave/index
storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
maxMessageSize=65536
服务器3
broker-a-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerIP1=120.120.181.209
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
abortFile=/usr/local/rocketmq/store-slave/abort
storePathRootDir=/usr/local/rocketmq/store-slave
storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
storePathIndex=/usr/local/rocketmq/store-slave/index
storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
maxMessageSize=65536
服务器3
broker-b.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerIP1=120.120.181.209
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
abortFile=/usr/local/rocketmq/store/abort
storePathRootDir=/usr/local/rocketmq/store
storePathCommitLog=/usr/local/rocketmq/store/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
storePathIndex=/usr/local/rocketmq/store/index
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
maxMessageSize=65536
6、修改服务器2和服务器3的runbroker.sh文件(按需修改)
修改jvm内存,默认8g,自定义修改
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
7、启动broker
服务器2
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
服务器3
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
8、验证集群
使用RocketMQ提供的tools工具验证集群是否正常工作,
在服务器2上配置环境变量
用于被tools中的生产者和消费者程序读取该变量。
export NAMESRV_ADDR='120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876'
·启动生产者
./tools.sh org.apache.rocketmq.example.quickstart.Producer
启动消费者
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
9、mqadmin管理工具
RocketMO提供了命令工具用于管理topic、broker、集群、消息等。比如可以使用mqadmin创建topic:
./mqadmin updateTopic -n 120.120.181.208:9876 -c DefaultCluster -t myTopicl
10、可视化控制台
RocketMO没有提供可视化管理控制平台,可以使用第三方管理控制平台:https://github.com/apache/rocketmq-externals/tree/rocketmq-console-1.0.0/rocketmq-console
1.下载管理控制台
2.解压缩
3.安装maven
4.修改下载的console项目,是一个maven项目,修改application.yml中集群地址,通过maven打包jar文件,运行jar
三、springboot整合rocketmq集群
生产者:
1.添加依赖
<!--RocketMQ相关依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
</exclusions>
</dependency>
我这边依赖中排除了很多依赖,是因为在自己的项目中,通过Dependency Analyzer工具发现很多依赖冲突,这里自己处理了下。具体情况看自己项目~~~
2.配置文件
#rockermq配置
rocketmq:
consumer:
group: mes_consumer_group
#一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
name-server: 120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
producer:
#发送同一类消息的设置为同一个group,保证唯一
group: mes_producer_group
#发送消息超时时间,默认3000
send-message-timeout: 10000
#发送消息失败重试次数,默认2
retry-times-when-send-failed: 3
#异步消息重试此处,默认2
retry-times-when-send-async-failed: 3
#消息最大长度,默认1024*1024*4(默认4M)
max-message-size: 4096
#压缩消息阈值,默认4k(1024*4)
compress-message-body-threshold: 4096
#是否在内部发送失败时重试另一个broker,默认false
retry-next-server: false
这段配置是早期测试环境的配置,当时只是单体的rocketmq,我直接拿过来的,将name-server地址修改成了集群地址。其它的设置还需要看具体情况而定,比如max-message-size,记得之前采集硬件数据的时候,统一的是按照“条”来发送,但是部分设备的数据太大,超过了max-message-size设置的值,然后就出问题了,当时就把max-message-size的值调大了一些
3.核心代码
@Scheduled(cron = "0/15 * * * * ?")
public void startCollectData() {
try {
String lastAutoNo = getLastAutoNo();
List<AnalyseResult> analyseResults = analyseResultService.list(lastAutoNo);
if (CollUtil.isNotEmpty(analyseResults)) {
List<AnalyseResult> filterList = analyseResults.stream().filter(item -> isLegalSampleNo(item.getSampleNo())).collect(Collectors.toList());
if (CollUtil.isNotEmpty(filterList)) {
// 组装消息体
Map<String, Object> map = new HashMap<>(4);
map.put("customNo", APPLICATION_NAME);
map.put("deviceName", DeviceEnum.getDeviceNameByCustomNo(APPLICATION_NAME));
map.put("time", DateUtil.formatDate(new Date(), Constants.DATETIME_FORMAT_STANDARD));
for (AnalyseResult analyseResult : filterList) {
map.put("data", analyseResult);
log.info("采集到数据:{}", analyseResult);
rocketMQTemplate.convertAndSend(MQ_TOPIC_ANALYSE_RESULT, map);
}
String newLastAutoNo = analyseResults.get(0).getAutoNo();
redisTemplate.opsForValue().set(DeviceEnum.getRedisKeyByCustomNo(APPLICATION_NAME), newLastAutoNo, 1, TimeUnit.DAYS);
}
}
} catch (Exception e) {
log.error("任务执行出错,错误:{}", e.getMessage(), e);
}
}
上面的代码是一个数据采集的定时任务,每15秒查询最新的硬件数据,过滤掉垃圾数据后,投递普通消息到指定topic,然后记录最新采集数据的offset。
ps:这段代码很早就写的,没有怎么优化,当时事情也比较多,就没有处理。现在看这段代码,优化的地方还是挺多的。
另外启动类添加了一段代码:
public static void main(String[] args) {
System.setProperty("rocketmq.client.logUseSlf4j","true");
SpringApplication.run(ImesDataCollectSdApplication.class, args);
}
rocketmq打印日志问题
消费端:
1.添加依赖
<!--RocketMQ相关依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
<exclusion>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</exclusion>
</exclusions>
</dependency>
2.配置文件
#rockermq配置
rocketmq:
consumer:
group: mes_consumer_group
#一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
name-server: 120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876;
producer:
#发送同一类消息的设置为同一个group,保证唯一
group: mes_producer_group
#发送消息超时时间,默认3000
send-message-timeout: 10000
#发送消息失败重试次数,默认2
retry-times-when-send-failed: 3
#异步消息重试此处,默认2
retry-times-when-send-async-failed: 3
#消息最大长度,默认1024*1024*4(默认4M)
max-message-size: 1048576
#压缩消息阈值,默认4k(1024*4)
compress-message-body-threshold: 4096
#是否在内部发送失败时重试另一个broker,默认false
retry-next-server: false
3.核心代码
package com.imes.api.mq.consumer;
import cn.hutool.core.collection.CollUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imes.api.common.enums.DeviceEnum;
import com.imes.api.service.QpesRawSampleInspService;
import com.imes.api.service.QpesRawSampleService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* mq消费端 analyzer_result_topic
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "", consumerGroup = "mes_consumer_group")
public class AnalyzerResultConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Autowired
private QpesRawSampleService qpesRawSampleService;
@Autowired
private QpesRawSampleInspService qpesRawSampleInspService;
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(String payload) {
try {
if (StringUtils.isNotBlank(payload)) {
log.info("【MQ接收到消息】 线程:{} , payload:{}", Thread.currentThread().getName(), payload);
Map<String, Object> map = objectMapper.readValue(payload, new TypeReference<Map<String, Object>>() {
});
//String deviceName = (String) map.get("deviceName");
String customNo = (String) map.get("customNo");
//String time = (String) map.get("time");
Object data = map.get("data");
Class clazz = DeviceEnum.getClassByDeviceNo(customNo);
if (Objects.nonNull(clazz)) {
// 创建实例
Object instance = clazz.newInstance();
// 获取指定方法
Method targetMethod = clazz.getMethod("deal", String.class,QpesRawSampleService.class,QpesRawSampleInspService.class);
// 调用方法
targetMethod.invoke(instance, objectMapper.writeValueAsString(data),qpesRawSampleService,qpesRawSampleInspService);
}
}
} catch (Exception e) {
log.error("采集数据解析出错:{}", e.getMessage());
}
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
try {
consumer.subscribe("industrial_analyzer_topic", "*");// 工业分析仪
consumer.subscribe("calorimeter_topic", "*");// 量热仪
consumer.subscribe("automatic_sulfur_analyzer_topic", "*");// 自动定硫仪
consumer.subscribe("fully_automatic_coke_reactivity_and_post_reaction_strength_tester_topic", "*");// 全自动焦炭反应性及反应后强度测定仪
consumer.subscribe("x_ray_fluorescence_spectrometer_topic", "*");// x荧光光谱仪
consumer.subscribe("infrared_carbon_sulfur_analyzer_topic", "*");// 红外碳硫仪
consumer.subscribe("automatic_lime_activity_detector_topic", "*");//石灰活性度自动检测仪
consumer.subscribe("icp_emission_spectrometer_topic", "*");// ICP发射光谱仪
// TODO 气象色普仪
// TODO 全自动快速测试仪
// TODO 直读光谱仪
consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {
if (CollUtil.isNotEmpty(messages)) {
for (MessageExt message : messages) {
onMessage(new String(message.getBody()));
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
} catch (MQClientException e) {
log.error("prepareStart error: {}",e.getMessage());
}
}
}
这段代码中,通过consumer.subscribe订阅了多个topic,这边订阅的topic就是生产者投递消息的目标topic。代码中使用了反射和策略模式。由于不通厂不通型号的设备数据格式不一样,甚至同一厂商同一型号的设备也有所差异,因此基本上需要针对每台设备单独处理采集结果。
ps:最开始的时候,我使用的是一个topic,生产者的消息都投递到同一个topic中,根据Message中携带的设备编号区分设备,然后解析数据。
但是后来基于某种考虑,每种类型的设备分到同一个topic,然后在测试的时候我发现,分多个topic后,感觉上消息的消费比单个topic有所延迟(相对而言的,毫秒级别)