詹学伟
詹学伟
Published on 2024-06-05 / 29 Visits
0
0

RocketMQ4.7.1集群搭建

一、部署前的说明

本文根据官网资料搭建,官方网站部署文档地址:

https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy

本文采用3台服务器,多节点(集群)多副本模式-异步复制,具体看表格:

服务器

IP地址

NameServer

Broker节点

服务器1

120.120.181.204

120.120.181.204:9876

服务器2

120.120.181.208

120.120.181.208:9876

broker-a(master),broker-b-s(slave)

服务器3

120.120.181.209

120.120.181.209:9876

broker-b(master),broker-a-s(slave)

二、安装部署

1、安装JDK

JDK安装就不用多说了

2、安装rocketmq

官网下载地址:

https://rocketmq.apache.org/zh/download

3、修改rocketmq的jvm内存大小(按需修改)

cd /usr/local/rocketmq/bin
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

4、启动nameserver

第一台

nohup ./mqnamesrv -n 120.120.181.204:9876 &

第二台

nohup ./mqnamesrv -n 120.120.181.208:9876 &

第三台

nohup ./mqnamesrv -n 120.120.181.209:9876 &

5、配置borker

服务器2

broker-a.properties

# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=120.120.181.208
# broker的id,0表示master,>0表示slave
brokerId=0
# 删除文件时间点,默认在凌晨4点
deleteWhen=04
# 文件保留时间为48小时
fileReservedTime=48
# broker的角色为master
brokerRole=ASYNC_MASTER
#使用异步刷盘的方式
flushDiskType=ASYNC_FLUSH
#名称服务器的地址列表
namesrvAddr=120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
# 在发送消息自动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端口
listenPort=10911
# abort文件存储路径
abortFile=/usr/local/rocketmq/store/abort 
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog存储路径炸
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# 限制的消息大小
maxMessageSize=65536
# commitLog每个文件的大小默认1G次
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

服务器2

broker-b-s.properties

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerIP1=120.120.181.208
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
defaultTopicQueueNums=4
autocreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
abortFile=/usr/local/rocketmq/store-slave/abort
storePathRootDir=/usr/local/rocketmq/store-slave
storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
storePathIndex=/usr/local/rocketmq/store-slave/index
storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
maxMessageSize=65536

服务器3

broker-a-s.properties

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerIP1=120.120.181.209
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
abortFile=/usr/local/rocketmq/store-slave/abort
storePathRootDir=/usr/local/rocketmq/store-slave
storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
storePathIndex=/usr/local/rocketmq/store-slave/index
storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
maxMessageSize=65536

服务器3

broker-b.properties

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerIP1=120.120.181.209
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
abortFile=/usr/local/rocketmq/store/abort
storePathRootDir=/usr/local/rocketmq/store
storePathCommitLog=/usr/local/rocketmq/store/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
storePathIndex=/usr/local/rocketmq/store/index
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
maxMessageSize=65536

6、修改服务器2和服务器3的runbroker.sh文件(按需修改)

修改jvm内存,默认8g,自定义修改

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

7、启动broker

服务器2

nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &

服务器3

nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &

8、验证集群

使用RocketMQ提供的tools工具验证集群是否正常工作,

在服务器2上配置环境变量

用于被tools中的生产者和消费者程序读取该变量。

export NAMESRV_ADDR='120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876'

·启动生产者

./tools.sh org.apache.rocketmq.example.quickstart.Producer

启动消费者

./tools.sh org.apache.rocketmq.example.quickstart.Consumer

9、mqadmin管理工具

RocketMO提供了命令工具用于管理topic、broker、集群、消息等。比如可以使用mqadmin创建topic:

./mqadmin updateTopic -n 120.120.181.208:9876 -c DefaultCluster -t myTopicl

10、可视化控制台

RocketMO没有提供可视化管理控制平台,可以使用第三方管理控制平台:https://github.com/apache/rocketmq-externals/tree/rocketmq-console-1.0.0/rocketmq-console

1.下载管理控制台

2.解压缩

3.安装maven

4.修改下载的console项目,是一个maven项目,修改application.yml中集群地址,通过maven打包jar文件,运行jar

三、springboot整合rocketmq集群

生产者:

1.添加依赖

<!--RocketMQ相关依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-lang3</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.alibaba</groupId>
                    <artifactId>fastjson</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

我这边依赖中排除了很多依赖,是因为在自己的项目中,通过Dependency Analyzer工具发现很多依赖冲突,这里自己处理了下。具体情况看自己项目~~~

2.配置文件

#rockermq配置
rocketmq:
  consumer:
    group: mes_consumer_group
    #一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 10
  name-server: 120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876
  producer:
    #发送同一类消息的设置为同一个group,保证唯一
    group: mes_producer_group
    #发送消息超时时间,默认3000
    send-message-timeout: 10000
    #发送消息失败重试次数,默认2
    retry-times-when-send-failed: 3
    #异步消息重试此处,默认2
    retry-times-when-send-async-failed: 3
    #消息最大长度,默认1024*1024*4(默认4M)
    max-message-size: 4096
    #压缩消息阈值,默认4k(1024*4)
    compress-message-body-threshold: 4096
    #是否在内部发送失败时重试另一个broker,默认false
    retry-next-server: false

这段配置是早期测试环境的配置,当时只是单体的rocketmq,我直接拿过来的,将name-server地址修改成了集群地址。其它的设置还需要看具体情况而定,比如max-message-size,记得之前采集硬件数据的时候,统一的是按照“条”来发送,但是部分设备的数据太大,超过了max-message-size设置的值,然后就出问题了,当时就把max-message-size的值调大了一些

3.核心代码

    @Scheduled(cron = "0/15 * * * * ?")
    public void startCollectData() {
        try {
            String lastAutoNo = getLastAutoNo();
            List<AnalyseResult> analyseResults = analyseResultService.list(lastAutoNo);
            if (CollUtil.isNotEmpty(analyseResults)) {
                List<AnalyseResult> filterList = analyseResults.stream().filter(item -> isLegalSampleNo(item.getSampleNo())).collect(Collectors.toList());
                if (CollUtil.isNotEmpty(filterList)) {
                    // 组装消息体
                    Map<String, Object> map = new HashMap<>(4);
                    map.put("customNo", APPLICATION_NAME);
                    map.put("deviceName", DeviceEnum.getDeviceNameByCustomNo(APPLICATION_NAME));
                    map.put("time", DateUtil.formatDate(new Date(), Constants.DATETIME_FORMAT_STANDARD));
                    for (AnalyseResult analyseResult : filterList) {
                        map.put("data", analyseResult);
                        log.info("采集到数据:{}", analyseResult);
                        rocketMQTemplate.convertAndSend(MQ_TOPIC_ANALYSE_RESULT, map);
                    }
                    String newLastAutoNo = analyseResults.get(0).getAutoNo();
                    redisTemplate.opsForValue().set(DeviceEnum.getRedisKeyByCustomNo(APPLICATION_NAME), newLastAutoNo, 1, TimeUnit.DAYS);
                }
            }
        } catch (Exception e) {
            log.error("任务执行出错,错误:{}", e.getMessage(), e);
        }
    }

上面的代码是一个数据采集的定时任务,每15秒查询最新的硬件数据,过滤掉垃圾数据后,投递普通消息到指定topic,然后记录最新采集数据的offset。

ps:这段代码很早就写的,没有怎么优化,当时事情也比较多,就没有处理。现在看这段代码,优化的地方还是挺多的。

另外启动类添加了一段代码:

    public static void main(String[] args) {
        System.setProperty("rocketmq.client.logUseSlf4j","true");
        SpringApplication.run(ImesDataCollectSdApplication.class, args);
    }

rocketmq打印日志问题

消费端:

1.添加依赖

<!--RocketMQ相关依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-all</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-lang3</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.alibaba</groupId>
                    <artifactId>fastjson</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.hibernate</groupId>
                    <artifactId>hibernate-validator</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

2.配置文件

#rockermq配置
rocketmq:
  consumer:
    group: mes_consumer_group
    #一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 10
  name-server: 120.120.181.204:9876;120.120.181.208:9876;120.120.181.209:9876;
  producer:
    #发送同一类消息的设置为同一个group,保证唯一
    group: mes_producer_group
    #发送消息超时时间,默认3000
    send-message-timeout: 10000
    #发送消息失败重试次数,默认2
    retry-times-when-send-failed: 3
    #异步消息重试此处,默认2
    retry-times-when-send-async-failed: 3
    #消息最大长度,默认1024*1024*4(默认4M)
    max-message-size: 1048576
    #压缩消息阈值,默认4k(1024*4)
    compress-message-body-threshold: 4096
    #是否在内部发送失败时重试另一个broker,默认false
    retry-next-server: false

3.核心代码

package com.imes.api.mq.consumer;


import cn.hutool.core.collection.CollUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imes.api.common.enums.DeviceEnum;
import com.imes.api.service.QpesRawSampleInspService;
import com.imes.api.service.QpesRawSampleService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
 * mq消费端 analyzer_result_topic
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "", consumerGroup = "mes_consumer_group")
public class AnalyzerResultConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {

    @Autowired
    private QpesRawSampleService qpesRawSampleService;

    @Autowired
    private QpesRawSampleInspService qpesRawSampleInspService;

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void onMessage(String payload) {
        try {
            if (StringUtils.isNotBlank(payload)) {
                log.info("【MQ接收到消息】 线程:{} , payload:{}", Thread.currentThread().getName(), payload);
                Map<String, Object> map = objectMapper.readValue(payload, new TypeReference<Map<String, Object>>() {
                });
                //String deviceName = (String) map.get("deviceName");
                String customNo = (String) map.get("customNo");
                //String time = (String) map.get("time");
                Object data = map.get("data");
                Class clazz = DeviceEnum.getClassByDeviceNo(customNo);
                if (Objects.nonNull(clazz)) {
                    // 创建实例
                    Object instance = clazz.newInstance();
                    // 获取指定方法
                    Method targetMethod = clazz.getMethod("deal", String.class,QpesRawSampleService.class,QpesRawSampleInspService.class);
                    // 调用方法
                    targetMethod.invoke(instance, objectMapper.writeValueAsString(data),qpesRawSampleService,qpesRawSampleInspService);
                }
            }
        } catch (Exception e) {
            log.error("采集数据解析出错:{}", e.getMessage());
        }
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        try {
            consumer.subscribe("industrial_analyzer_topic", "*");// 工业分析仪
            consumer.subscribe("calorimeter_topic", "*");// 量热仪
            consumer.subscribe("automatic_sulfur_analyzer_topic", "*");// 自动定硫仪
            consumer.subscribe("fully_automatic_coke_reactivity_and_post_reaction_strength_tester_topic", "*");// 全自动焦炭反应性及反应后强度测定仪
            consumer.subscribe("x_ray_fluorescence_spectrometer_topic", "*");// x荧光光谱仪
            consumer.subscribe("infrared_carbon_sulfur_analyzer_topic", "*");// 红外碳硫仪
            consumer.subscribe("automatic_lime_activity_detector_topic", "*");//石灰活性度自动检测仪
            consumer.subscribe("icp_emission_spectrometer_topic", "*");// ICP发射光谱仪
            // TODO 气象色普仪
            // TODO 全自动快速测试仪
            // TODO 直读光谱仪
            consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {
                if (CollUtil.isNotEmpty(messages)) {
                    for (MessageExt message : messages) {
                        onMessage(new String(message.getBody()));
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
        } catch (MQClientException e) {
            log.error("prepareStart error: {}",e.getMessage());
        }
    }
}

这段代码中,通过consumer.subscribe订阅了多个topic,这边订阅的topic就是生产者投递消息的目标topic。代码中使用了反射和策略模式。由于不通厂不通型号的设备数据格式不一样,甚至同一厂商同一型号的设备也有所差异,因此基本上需要针对每台设备单独处理采集结果。

ps:最开始的时候,我使用的是一个topic,生产者的消息都投递到同一个topic中,根据Message中携带的设备编号区分设备,然后解析数据。

但是后来基于某种考虑,每种类型的设备分到同一个topic,然后在测试的时候我发现,分多个topic后,感觉上消息的消费比单个topic有所延迟(相对而言的,毫秒级别)


Comment