11Nacos配置中心之客户端长轮询

网友投稿 1168 2022-05-29

11Nacos配置中心之客户端长轮询

createConfigService

NacosConfigService构造

ClientWorker构造

checkConfigInfo()方法:

LongPollingRunnable

checkUpdateDataIds()方法

getServerConfig

总结

11Nacos配置中心之客户端长轮询

createConfigService

NacosConfigService构造

ClientWorker构造

checkConfigInfo()方法:

LongPollingRunnable

checkUpdateDataIds()方法

getServerConfig

总结

11Nacos配置中心之客户端长轮询

11Nacos配置中心之客户端长轮询

客户端长轮询定时任务是在NacosFactory的createConfigService构建ConfigService对象实例的时候启动的

createConfigService

public static ConfigService createConfigService(String serverAddr) throws NacosException { return ConfigFactory.createConfigService(serverAddr); }

public class ConfigFactory { /** * Create Config * * @param properties init param * @return ConfigService * @throws NacosException Exception */ public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } } /** * Create Config * * @param serverAddr serverList * @return Config * @throws ConfigService Exception */ public static ConfigService createConfigService(String serverAddr) throws NacosException { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); return createConfigService(properties); } }

通过Class.forName加载NacosConfigService类

使用反射来完成NacosConfigService类的实例化

NacosConfigService构造

NacosConfigService构造方法:

public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty("encode"); if (StringUtils.isBlank(encodeTmp)) { this.encode = "UTF-8"; } else { this.encode = encodeTmp.trim(); } this.initNamespace(properties); this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }

它的构造方法中:

初始化HttpAgent,使用了装饰器模式,实际工作的类是ServerHttpAgent,MetricsHttpAgent内部也调用了ServerHttpAgent的方法,增加监控统计信息

ClientWorker是客户端的工作类,agent作为参数传入ClientWorker,用agent做一些远程调用

ClientWorker构造

ClientWorker的构造函数:

@SuppressWarnings("PMD.ThreadPoolCreationRule") public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }

构造方法中:

构建定时调度的线程池,第一个线程池executor只拥有一个核心线程,每隔10s执行一次checkConfigInfo()方法,功能就是每10ms检查一次配置信息

第二个线程池executorService只完成了初始化,后续用于客户端的定时长轮询功能。

checkConfigInfo()方法:

public void checkConfigInfo() { int listenerSize = cacheMap.get().size(); int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }

这个方法的主要功能就是检查配置信息是否发送变化,

获取监听个数

分配长轮询任务数,向上取整

判断长轮询任务数是否比当前长轮询任务数大,如果大的话创建指定就创建线程达到所需的任务数的线程数量,如果不比当前任务数就把求得长轮询任务数赋值给当前长轮询任务数

cacheMap用来存储监听变更的缓存集合,key是根据dataID/group/tenant拼接的值。Value是对应的存储在Nacos服务器上的配置文件的内容。

默认情况下每个长轮询LongPollingRunnable任务处理3000个监听配置集,超过3000个启动多个LongPollingRunnable执行。

LongPollingRunnable

LongPollingRunnable是一个线程,我们可以直接找到LongPollingRunnable里面的run方法

class LongPollingRunnable implements Runnable { private int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List cacheDatas = new ArrayList(); List inInitializingCacheList = new ArrayList(); try { // check failover config for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // check server config List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } }

LongPollingRunnable类的run()方法中:

遍历CacheData,检查本地配置,根据taskId对cacheMap进行数据分割,通过checkLocalConfig方法检查本地配置,本地在${user}\naocs\config\目录下缓存一份服务端的配置信息,checkLocalConfig将内存中的数据和本地磁盘数据比较,不一致说明数据发生了变化,需要触发事件通知。

执行checkUpdateDataIds方法在服务端建立长轮询机制,通过长轮询检查数据变更。

遍历变更数据集合changedGroupKeys,调用getServerConfig方法,根据dataId,group,tenant去服务端读取对应的配置信息并保存到本地文件中。

继续定时执行当前线程

checkUpdateDataIds()方法

checkUpdateDataIds()方法基于长连接方式监听服务端配置的变化,最后根据变化数据的key去服务端获取最新数据。

checkUpdateDataIds中调用checkUpdateConfigStr

/** * */ List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List headers = new ArrayList(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try { HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }

这个方法的作用就是从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的,保证不返回NULL

checkUpdateConfigStr()方法中通过agent.httpPost调用/v1/cs/configs/listener接口实现长轮询请求。长轮询请求是实现层面只是设置了一个比较长的超时时间,默认30s。如果服务端的数据发生变更,客户端会收到HttpResult。服务端返回的是存在数据变更的dataId, group, tenant。获得这些信息后,在LongPollingRunnable的run方法中调用getServerConfig方法从Nacos服务器中读取具体的配置内容。

getServerConfig

从Nacos服务器中读取具体的配置内容:

public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpResult result = null; try { List params = null; if (StringUtils.isBlank(tenant)) { params = Arrays.asList("dataId", dataId, "group", group); } else { params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant); } result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (IOException e) { String message = String.format( "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, e); throw new NacosException(NacosException.SERVER_ERROR, e); } switch (result.code) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content); return result.content; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return null; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, " + "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.code, result.content); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.code); throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }

总结

现在我们知道Nacos配置中心的客户端做了哪些事了,客户端创建NacosConfigService实例,它的构造方法中创建了ClientWorker对象,ClientWorker中就设定了定时线程每隔10秒执行一次checkConfigInfo()方法来检查配置信息是否变更,使用的线程是LongPollingRunnable,它的run()方法中的逻辑就是调用checkUpdateDataIds()方法检查是否数据变更,本质是调用服务端的/v1/cs/configs/listener接口来实现的

任务调度

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:20 种高效的工作方式,助力你更有效率的编码
下一篇:《企业级容器云架构开发指南》—1.2 虚拟化热点技术与终极目标
相关文章