1 生产者
1.1 发送消息注意事项
1 Tags的使用
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。
2 Keys的使用
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
123// 订单Id String orderId = "20034568923546"; message.setKeys(orderId);
3 日志的打印
消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。以下对每个状态进行说明:
SEND ...
1 消息存储
消息存储是 RocketMQ 中最为复杂和最为重要的一部分,本节将分别从
RocketMQ 的消息存储整体架构、PageCache 与 Mmap 内存映射以及 RocketMQ
中两种不同的刷盘方式三方面来分别展开叙述。
1.1 消息存储整体架构
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。 (1)
CommitLog:消息主体以及元数据的存储主体,存储 Producer
端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G
,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如
00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为
1G=1073741824;当第一个文件写满了,第二个文件为
00000000001073741824,起始偏移量为
1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于
RocketMQ 是基于主题 topic
的订阅模式,消息消费是针对主题进行的,如果 ...
1 基本样例
在基本样例中我们提供如下的功能场景:
使用RocketMQ发送三种类型的消息:同步消息、异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
使用RocketMQ来消费接收到的消息。
1.1 加入依赖:
maven:
12345<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version></dependency>
gradle
1compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
1.2 消息发送
1、Producer端发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
1234567891011121314151617181920212223public ...
1. 权限控制特性介绍
权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过
RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在distribution/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常;
ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码。
2. 权限控制的定义与属性值
2.1权限定义
对RocketMQ的Topic资源访问权限控制定义主要如下表所示,分为以下四种
|权限|含义| |-|-| |DENY |拒绝| |ANY |PUB 或者 SUB 权限| |PUB |发送权限|
|SUB |订阅权限|
2.2 权限定义的关键属性
字段
取值
含义
globalWhiteRemoteAddres ...
技术架构
RocketMQ架构上主要分为四部分,如上图所示:
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。
主要包括两个功能:
Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行 ...
1 集群搭建
1.1 单Master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
1)启动 NameServer
123456### 首先启动Name Server$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功$ tail -f ~/logs/rocketmqlogs/namesrv.logThe Name Server boot success...
2)启动 Broker
123456### 启动Broker$ nohup sh bin/mqbroker -n localhost:9876 &### 验证Name Server 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a$ tail -f ~/logs/rocketmqlogs/broker.log The broker[broker-a, 192.169.1.2:10911] boot success...
1.2 多Master模式
一个集群无S ...
Dledger快速搭建
前言
该文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的
RocketMQ 集群。
详细的新集群部署和旧集群升级指南请参考 部署指南。
1. 源码构建
构建分为两个部分,需要先构建 DLedger,然后 构建 RocketMQ
1.1 构建 DLedger
12345git clone https://github.com/openmessaging/openmessaging-storage-dledger.gitcd openmessaging-storage-dledgermvn clean install -DskipTests
1.2 构建 RocketMQ
1234567git clone https://github.com/apache/rocketmq.gitcd rocketmqgit checkout -b store_with_dledger origin/store_with_dledgermvn -Prelease-all -DskipTests clean install -U
2. 快速部署
在构建成功 ...
1. 订阅与发布
消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
2. 消息顺序
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
全局顺序 对于指定的一个
Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用场景:性能要求不高,所有的消息严格按照 FIFO
原则进行消息发布和消费的场景
分区顺序 对于指定的一个 Topic,所有消息根据 sharding key
进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。
Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key
是完全不同的概念。 ...
1. 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer
负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker
在实际部署过程中对应一台服务器,每个 Broker
可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的
Broker。Message Queue
用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue
中。ConsumerGroup 由多个Consumer 实例构成。
2. 消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
3. 消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程 ...
前言
该文档主要介绍如何部署自动容灾切换的 RocketMQ-on-DLedger Group。
RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3
个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在
Leader 和 Follower 之间复制数据以保证高可用。 RocketMQ-on-DLedger Group
能自动容灾切换,并保证数据一致。 RocketMQ-on-DLedger Group
是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group
同时对外提供服务。
1. 新集群部署
1.1 编写配置
每个 RocketMQ-on-DLedger Group 至少准备三台机器(本文假设为 3)。
编写 3 个配置文件,建议参考 conf/dledger 目录下的配置文件样例。
关键配置介绍: |name|含义|举例| |-|:–:|-|
|enableDLegerCommitLog|是否启动DLedger|true| |dLegerGroup|DLed ...