Nacos源碼閱讀方法
為什麼我會經常閱讀源碼呢,因為閱讀源碼能讓你更加接近大佬,哈哈,這是我瞎扯的。
這篇文章將會帶大傢閱讀Nacos源碼 以及 教大傢閱讀源碼的技巧,我們正式開始吧!
先給大傢獻上一張我梳理的高清源碼圖,方便大傢對nacos的源碼有一個整體上的認識。
有瞭這張圖,我們就很容易去看nacos源碼瞭。
如何找切入點
首先我們得要找一個切入點進入到nacos源碼中,那麼就從nacos依賴入手
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>
進入這個依賴文件,會發現它又依賴瞭一個組件:
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-nacos-discovery</artifactId> </dependency>
進入依賴之後,我們發現它長這樣:
從這張圖中,我們發現瞭一個熟悉的配置文件spring.factories,這是sringboot自動裝配的必備文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
因為這張主要說的是服務註冊源碼,所以我們可以隻用關註(NacosServiceRegistryAutoConfiguration)自動裝配文件
public class NacosServiceRegistryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(nacosDiscoveryProperties, context); public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); }
我們看到的是三個bean註入,這裡給大傢介紹一個看源碼的小技巧:自動裝配的文件中申明的bean類,我們隻需要看帶有auto的bean,這個往往是入口;NacosAutoServiceRegistration 帶有auto,我們點進去看看裡面都有什麼:
@Override protected void register() { if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) { log.debug("Registration disabled."); return; } if (this.registration.getPort() < 0) { this.registration.setPort(getPort().get()); } super.register(); }
裡面有一個register()方法,我在這裡打個斷點,因為我猜測這個就是註冊的入口,我現在使用debug模式,啟動一個服務,看它會不會調用這個方法:
客戶端註冊
這裡貼上我debug後,進入register方法的調用鏈截圖
看到這個調用鏈,看到一個onApplicationEvent的回調方法,找到這個方法所在的類AbstractAutoServiceRegistration
這個類繼承瞭ApplicationListener這個多播器監聽器,spring啟動之後,會發佈多播器事件,然後回調實現多播器組件的onApplicationEvent方法,我們從這個方法開始分析:
public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); // 綁定端口,並啟動 } @Deprecated public void bind(WebServerInitializedEvent event) { // 設置端口 this.port.compareAndSet(0, event.getWebServer().getPort()); // 啟動客戶端註冊組件 this.start(); } public void start() { // 省略分支代碼 // 調用註冊 register(); }
因為springcloud提供瞭多種註冊中心擴展,但是我們這裡隻引用瞭nacos註冊中心,所以這裡直接調用的是NacosServiceRegistry的register方法:
public void register(Registration registration) { // 省略分支代碼 // 獲取服務id String serviceId = registration.getServiceId(); // 獲取組配置 String group = nacosDiscoveryProperties.getGroup(); // 封裝服務實例 Instance instance = getNacosInstanceFromRegistration(registration); // 調用 命名服務的 registerInstance方法 註冊實例 namingService.registerInstance(serviceId, group, instance); }
進入到registerInstance方法
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { // 省略分支代碼 // 與服務端建立心跳,默認每隔5秒定時發送新跳包 this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } // 通過http方式向服務端發送註冊請求 this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
serverproxy通過調用對http進行封裝的reapi方法,向服務端接口("/nacos/v1/ns/instance")發送請求,
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance}); Map<String, String> params = new HashMap(9); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST"); }
我們知道nacos經常是以集群形式部署的,那客戶端是如何選擇其中一個節點發送呢,肯定得實現負載均衡的邏輯,我們點擊reqAPI,看它是如何實現的
if (servers != null && !servers.isEmpty()) { Random random = new Random(System.currentTimeMillis()); // 隨機獲取一個索引,servers保存的是所有nacos節點地址 int index = random.nextInt(servers.size()); // 遍歷所有節點,根據index值,從servers中找到對應位置的server,進行請求調用,如果調用成功則返回,否則依次往後遍歷,直到請求成功 for(int i = 0; i < servers.size(); ++i) { String server = (String)servers.get(index); try { return this.callServer(api, params, server, method); } catch (NacosException var11) { exception = var11; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11); } catch (Exception var12) { exception = var12; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12); } // index+1 然後取模 是保證index不會越界 index = (index + 1) % servers.size(); } throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage()); }
到這裡,客戶端註冊的代碼已經分析完瞭,不過這還不是本篇的結束,我們還得繼續分析服務端是如何處理客戶端發送過來的註冊請求:
服務端處理客戶端註冊請求
如果需要查看服務端源碼的話,則需要將nacos源碼下下來 下載地址
我們從服務註冊api接口地址(/nacos/v1/ns/instance),可以找到對應的controller為(com.alibaba.nacos.naming.controllers.InstanceController)
因為註冊實例發送的是post請求,所以直接找被postmapping註解的register方法
@CanDistro @PostMapping public String register(HttpServletRequest request) throws Exception { // 獲取服務名 String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); // 獲取命名空間id String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 註冊實例 serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); return "ok"; }
我們點擊進入到registerInstance方法:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 執行添加實例的操作 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
分析
在nacos中,註冊實例後,還需要將註冊信息同步到其他節點,所有在nacos中存在兩種同步模式AP和CP,ap和cp主要體現在集群中如何同步註冊信息到其它集群節點的實現方式上;
nacos通過ephemeral 字段值來決定是使用ap方式同步還是cp方式同步,默認使用的的ap方式同步註冊信息。com.alibaba.nacos.naming.core.ServiceManager.addInstance()
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 生成服務的key String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 獲取服務 Service service = getService(namespaceId, serviceName); // 使用同步鎖處理 synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 調用consistencyService.put 處理同步過來的服務 consistencyService.put(key, instances); } }
我們在進入到consistencyService.put方法中
點擊put方法時,會看到有三個實現類,根據上下文(或者debug方式),可以推斷出這裡引用的是DelegateConsistencyServiceImpl實現類
@Override public void put(String key, Record value) throws NacosException { // 進入到這個put方法後,就可以知道應該使用ap方式同步還是cp方式同步 mapConsistencyService(key).put(key, value); }
從下面的方法中 可以判斷通過key來判斷使用ap還是cp來同步註冊信息,其中key是由ephemeral字段組成;
private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }
AP 方式同步的流程(ephemeralConsistencyService) 本地服務器處理註冊信息&將註冊信息同步到其它節點
@Override public void put(String key, Record value) throws NacosException { // 處理本地註冊列表 onPut(key, value); // 添加阻塞任務,同步信息到其他集群節點 taskDispatcher.addTask(key); }
處理本地註冊節點
nacos將key做為一個task,添加到notifer中阻塞隊列tasks中,並且使用單線程執行,其中notifer是初始化的時候,作為一個線程被放到線程池中(線程池隻設置瞭一個核心線程);
這裡有一個點需要告訴大傢:在大多數分佈式框架,都會采用單線程的阻塞隊列來處理耗時的任務,一方面解決並發問題,另一方面能夠解決並發帶來的寫寫沖突問題。
線程中的主要處理邏輯就是,循環讀取阻塞隊列中的內容,然後處理註冊信息,更新到內存註冊列表中。
同步註冊信息到其他集群節點
nacos同樣也是把註冊key作為一個task存放到 TaskDispatcher 中的taskShedule阻塞隊列中,然後開啟線程循環讀取阻塞隊列:
@Override public void run() { List<String> keys = new ArrayList<>(); while (true) { String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS); // 省略判斷代碼 // 添加同步的key keys.add(key); // 計數 dataSize++; // 判斷同步的key大小是否等於 批量同步設置的限量 或者 判斷據上次同步時間 是否大於 配置的間隔周期,如果滿足任意一個,則開始同步 if (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) { // 遍歷所有集群節點,直接調用http進行同步 for (Server member : dataSyncer.getServers()) { if (NetUtils.localServer().equals(member.getKey())) { continue; } SyncTask syncTask = new SyncTask(); syncTask.setKeys(keys); syncTask.setTargetServer(member.getKey()); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask)); } dataSyncer.submit(syncTask, 0); } // 記錄本次同步時間 lastDispatchTime = System.currentTimeMillis(); // 計數清零 dataSize = 0; } } } }
使用ap方式作同步的過程很簡單,但是這裡面有兩種設計思路來解決單個key同步的問題:
如果有新的key推送上來,nacos就發起一次同步,這會造成網絡資源浪費,因為每次同步的就隻有一個key或者幾個key;
同步少量的key解決方案: 隻有積累到指定數量的key,才發起批量同步距離上次同步時間超過配置的限制時間,則忽略key數量,直接發起同步 CP 方式同步的流程(RaftConsistencyServiceImpl)
cp模式追求的是數據一致性,為瞭數據一致性,那麼肯定得選出一個leader,由leader首先同步,然後再由leader通知follower前來獲取最新的註冊節點(或者主動推送給follower)
nacos使用raft協議來進行選舉leader,來實現cp模式。
同樣進入到 RaftConsistencyServiceImpl的put方法
@Override public void put(String key, Record value) throws NacosException { try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); } }
進入到raftCore.signalPublish方法中,我提取幾個關鍵的代碼
// 首先判斷當前nacos節點是否是leader,如果不是leader,則獲取leader節點的ip,然後將請求轉發到leader處理,否則往下走 if (!isLeader()) { JSONObject params = new JSONObject(); params.put("key", key); params.put("value", value); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters); return; }
同樣采用同樣隊列的方式,去處理本地註冊列表
onPublish(datum, peers.local()); public void onPublish(Datum datum, RaftPeer source) throws Exception { // 添加同步key任務到阻塞隊列中 notifier.addTask(datum.key, ApplyAction.CHANGE); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); }
遍歷所有集群節點,發送http同步請求
for (final String server : peers.allServersIncludeMyself()) { // 如果是leader,則不進行同步 if (isLeader(server)) { latch.countDown(); continue; } // 組裝url 發送同步請求到其它集群節點 final String url = buildURL(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, response.getStatusCode()); return 1; } latch.countDown(); return 0; } @Override public STATE onContentWriteCompleted() { return STATE.CONTINUE; } }); }
到此,nacos服務註冊及服務實例同步的主幹源碼已經分析完瞭。
總結
對於剛開始接觸nacos源碼的同學,可以先把頭上的圖多看幾遍,然後對照著源碼找到對應的位置 ,最後結合圖再結合本文,整體連貫的看下來,相信會有很大收獲的;雖然閱讀源碼的過程很痛苦,但是你隻要堅持下來瞭,掌握到瞭閱讀源碼的技巧,你就會發現再難的源碼,你都能把它啃下來;後面我會專門寫一篇教你如何高效閱讀源碼的文章,希望對於剛接觸源碼的同學能有所幫助。
到此這篇關於Nacos源碼閱讀方法的文章就介紹到這瞭,更多相關Nacos源碼閱讀內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- SpringBoot實現服務接入nacos註冊中心流程詳解
- SpringBoot2 整合Nacos組件及環境搭建和入門案例解析
- Nacos集群模式下服務無法註冊問題
- SpringCloud 服務註冊中的nacos實現過程
- SpringBoot如何整合nacos詳解