RocketMQ Namesrv架構工作原理詳解

1 概念

Namesrv的作用是保存元數據提高Broker的可用性

Namesrv的主要功能是臨時存儲管理Topic路由信息,各個Namesrv節點之間是不通信無狀態的,互相不知道對方的存在。

當Broker,生產者,消費者啟動的時候,會輪詢全部的Namesrv節點,獲取路由信息。

2 核心數據結構和API

2.1 Namesrv的核心數據結構

Namesrv中保存的信息是Topic的路由信息,Topic的路由決定瞭Topic的信息發送給哪些Broker,或者從哪些Broker獲取消息。

路由數據結構的實現代碼都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //Broker存活的時間周期,默認120秒
    private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    //保存Topic和隊列的路由信息
    private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    //Broker名字和Broker信息的對應信息
    private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    //集群和Broker的對應關系
    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //在線的Broker地址和Broker信息的對應關系
    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //過濾服務器消息
    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
    private final BatchUnRegisterService unRegisterService;
    private final NamesrvController namesrvController;
    private final NamesrvConfig namesrvConfig;

2.2 Namesrv的API

Namesrv的的API在org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor中,根據方法名很容易判斷出來方法的作用。

switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        return this.deleteKVConfig(ctx, request);
    case RequestCode.QUERY_DATA_VERSION:
        return this.queryBrokerTopicConfig(ctx, request);
    case RequestCode.REGISTER_BROKER:
        //Broker註冊自身信息到Namesrv
        return this.registerBroker(ctx, request);
    case RequestCode.UNREGISTER_BROKER:
        //Broker取消註冊自身信息到Namesrv
        return this.unregisterBroker(ctx, request);
    case RequestCode.BROKER_HEARTBEAT:
        return this.brokerHeartbeat(ctx, request);
    case RequestCode.GET_BROKER_MEMBER_GROUP:
        return this.getBrokerMemberGroup(ctx, request);
    case RequestCode.GET_BROKER_CLUSTER_INFO:
        return this.getBrokerClusterInfo(ctx, request);
    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        return this.wipeWritePermOfBroker(ctx, request);
    case RequestCode.ADD_WRITE_PERM_OF_BROKER:
        return this.addWritePermOfBroker(ctx, request);
    case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
        return this.getAllTopicListFromNameserver(ctx, request);
    case RequestCode.DELETE_TOPIC_IN_NAMESRV:
        return this.deleteTopicInNamesrv(ctx, request);
    case RequestCode.REGISTER_TOPIC_IN_NAMESRV:
        return this.registerTopicToNamesrv(ctx, request);
    case RequestCode.GET_KVLIST_BY_NAMESPACE:
        return this.getKVListByNamespace(ctx, request);
    case RequestCode.GET_TOPICS_BY_CLUSTER:
        return this.getTopicsByCluster(ctx, request);
    case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
        return this.getSystemTopicListFromNs(ctx, request);
    case RequestCode.GET_UNIT_TOPIC_LIST:
        return this.getUnitTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
        return this.getHasUnitSubTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
        return this.getHasUnitSubUnUnitTopicList(ctx, request);
    case RequestCode.UPDATE_NAMESRV_CONFIG:
        return this.updateConfig(ctx, request);
    case RequestCode.GET_NAMESRV_CONFIG:
        return this.getConfig(ctx, request);
    case RequestCode.GET_CLIENT_CONFIG:
        return this.getClientConfigs(ctx, request);
    default:
        String error = " request type " + request.getCode() + " not supported";
        return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}

3 Namesrv架構

下圖是一個消息的常規流轉過程,生產者,消費者,Broker通過與Namesrv交換信息來實現自己的功能。

3.1組件

  • Broker

Broker在啟動的時候,將自己的元數據信息,上報給Namesrv,這部分信息也就是Topic路由。

這裡的元數據包含Broker本身的元數據和該Broker中Topic的信息。

  • 生產者

生產者隻關註Topic路由,從namesrv獲取到Topic路由後就可以知道這個Topic的消息存放到瞭哪些Broker中。

  • 消費者

消費者也隻關註Topic路由,從namesrv獲取到獲取到Topic路由之後,才能知道自己訂閱的Topic的Broker地址,從而獲取消息。

3.2 Namesrv四個功能模塊

  • Topic功能管理模塊

這是Namesrv最核心的模塊,Topic路由決定,Topic的數據會保存在哪些Broker上。Broker啟動的時候,會將自身的信息註冊到Namesrv中,以供消費者和生產者獲取。生產者和消費者與Namesrv之間會有心跳通信,從而獲取最新的Broker信息。

  • Remoting通信模塊

這個模塊是基於Netty的網絡通信封裝,擔任各個組件之間的網絡通信任務。

  • 定時任務模塊

定時任務模塊包括:定時掃描宕機的Broker,定時打印KV配置,定時掃描超時請求。

  • KV管理模塊

Namesrv維護瞭一個全局的KV配置魔窟啊,方便全局配置。

以上就是RocketMQ Namesrv架構工作原理詳解的詳細內容,更多關於RocketMQ Namesrv架構的資料請關註WalkonNet其它相關文章!

推薦閱讀: