代码编织梦想

引言

在Nacos属于集群时,当服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。

// DistroClientDataProcessor
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

同步时,会涉及到一个负责节点和非负责节点

负责节点(发起同步)

也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件

// DistroClientDataProcessor
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
    return;
}

DistroProtocol

Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议;

DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改,对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看

// DistroProtocol
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}

在调用syncToTarget后,会触发任务DistroDelayTaskProcessor处理任务,这是Distro协议的一个默认延迟任务处理器,可以看到。 对于删除类型的任务,触发任务DistroSyncDeleteTask , 对于删除的任务:DistroSyncChangeTask

public class DistroDelayTaskProcessor implements NacosTaskProcessor {
    @Override
    public boolean process(NacosTask task) {
        
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
        switch (distroDelayTask.getAction()) {
            case DELETE:
                DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                return true;
            case CHANGE:
            case ADD:
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            default:
                return false;
        }
    }
}

DistroSyncChangeTask

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    ...
    
    // 无回调
    @Override
    protected boolean doExecute() {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return true;
        }
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    }
    
    // 有回调
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        //将得到的数据同步给其他服务节点
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    
    // 从DistroClientDataProcessor获取DistroData
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
}

获取同步数据getDistroData

这里获取同步数据其实是从DistroClientDataProcessor 中获取的,所以为Client的相关注册服务信息
// DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor
@Override
public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}
可以看到generateSyncData 方法是关键获取服务的方法,该方法提供了同步数据,包含Client的注册信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。
// AbstractClient implements Client 
@Override
public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        namespaces.add(entry.getKey().getNamespace());
        groupNames.add(entry.getKey().getGroup());
        serviceNames.add(entry.getKey().getName());
        instances.add(entry.getValue());
    }
    return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

执行同步数据syncData

这里的同步实际是由DistroClientTransportAgent来负责的,将数据分装成DistroDataRequest 然后查询到对于的服务节点Member然后调用asyncRequest异步方法执行同步,后面的方法我就不跟了, 这时我们主要关注非负责节点收到同步请求后如何处理。
// DistroClientTransportAgent
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
    if (isNoExistTarget(targetServer)) {
        callback.onSuccess();
        return;
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    Member member = memberManager.find(targetServer);
    try {
        clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
    } catch (NacosException nacosException) {
        callback.onFailed(nacosException);
    }
}

非负责节点(接收请求)

当负责节点将数据发送给非负责节点以后,将要处理发送过来的Client数据。通过DistroController收到数据后, 然后最终会DistroClientDataProcessor.processData方法来进行处理

// DistroController.java
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
    ...
    DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
    distroProtocol.onReceive(distroHttpData);
    ...
}
// DistroClientDataProcessor.java
@Override
public boolean processData(DistroData distroData) {
    switch (distroData.getType()) {
        case ADD:
        case CHANGE:
            ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                    .deserialize(distroData.getContent(), ClientSyncData.class);
            handlerClientSyncData(clientSyncData);
            return true;
        case DELETE:
            String deleteClientId = distroData.getDistroKey().getResourceKey();
            Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
            clientManager.clientDisconnected(deleteClientId);
            return true;
        default:
            return false;
    }
}

可以看出,这里分别对ADD/CHANGE和DELETE进行了处理,这里我主要关注ADD/CHANGE,所以主要关注handlerClientSyncData方法。

private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
    // 同步客户端连接,此时如果客户端不存在,则会注册一个非负责节点client,后面就会获取到该客户端操作
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    // 获取Client(此时注册到的是ConnectionBasedClient)
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 更新Client数据
    upgradeClient(client, clientSyncData);
}

**注意:**这里要注意下此时的Client实现类ConnectionBasedClient,它的isNative属性为false,这是非负责节点和负责节点的主要区别。

     其实判断当前nacos节点是否为负责节点的依据就是这个**isNative属性**,如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient,它的isNative属性为true;如果是由Distro协议,同步到这个nacos节点上的ConnectionBasedClient,它的isNative属性为false。

     那其实我们都知道2.x的版本以后使用了长连接,所以**通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求**。

DistroClientDataProcessor的upgradeClient方法,更新Client里的注册表信息,发布对应事件

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    Set<Service> syncedService = new HashSet<>();
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
}

Distro协议负责集群数据统一

Distro为了确保集群间数据一致,不仅仅依赖于数据发生改变时的实时同步,后台有定时任务做数据同步。

在1.x版本中,责任节点每5s同步所有Service的Instance列表的摘要(md5)给非责任节点,非责任节点用对端传来的服务md5比对本地服务的md5,如果发生改变,需要反查责任节点。

在2.x版本中,对这个流程做了改造,责任节点会发送Client全量数据,非责任节点定时检测同步过来的Client是否过期,减少1.x版本中的反查。

责任节点每5s向其他节点发送DataOperation=VERIFY类型的DistroData,来维持非责任节点的Client数据不过期。

// DistroVerifyTimedTask.java
@Override
public void run() {
    // 所有其他节点
    List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
    for (String each : distroComponentHolder.getDataStorageTypes()) {
        // 遍历这些节点发送Client.isNative=true的DistroData,type = VERIFY
        verifyForDataStorage(each, targetServer);
    }
}

非责任节点每5s扫描isNative=false的client,如果client 30s内没有被VERIFY的DistroData更新过续租时间,会删除这个同步过来的Client数据。

//ConnectionBasedClientManager->ExpiredClientCleaner
private static class ExpiredClientCleaner implements Runnable {
    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        for (String each : clientManager.allClientId()) {
            ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
            if (null != client && client.isExpire(currentTime)) {
                clientManager.clientDisconnected(each);
            }
        }
    }
} 

// ConnectionBasedClient.java
@Override
public boolean isExpire(long currentTime) {
    // 判断30s内没有续租 认为过期
    return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime();
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/chyh741/article/details/123786412

nacos数据一致性_liyanan21的博客-爱代码爱编程_nacos数据一致性

目录 一、Raft算法 二、Nacos中Raft部分源码 init() 1. 获取Raft集群节点  NamingProxy.getServers()获取集群节点 NamingProxy.refreshSrvIfNeed()得到节点信息 NamingProxy.refreshServerListFromDisk()获取集群节点信息 2. R

Spring Cloud 2.2.2 源码之六十nacos数据一致性原理之临时结点数据同步一-爱代码爱编程

Spring Cloud 2.2.2 源码之六十nacos数据一致性原理之临时结点数据同步一 nacos数据一致性服务执行流程数据一致性TaskScheduler的run临时数据同步DataSyncer的submit nacos数据一致性服务执行流程 数据一致性 其实nacos内部提供两种数据同步方案AP和CP,而且是混用的,只要你的实

史上最强的Nacos集群HA高可用解决方案-爱代码爱编程

1、Nacos支持单机服务器部署,可以用MYSQL数据库取代默认的嵌入式数据库Derby 2、Nacos集群,集群的目的是为了高可用HA(High Availability),即服务注册发现和配置的获取能够顺利进行。官网推荐的集群方案如图: 这十分搞笑,VIP是啥?我暂时看来是个Nacos服务的请求转发工具吧,这用Nginx就很容易实现服务分发。上面的3台

Nacos源码分析之Raft协议(三)----数据的同步-爱代码爱编程

前言 上两篇文章主要分析了leader选举以及发送心跳包的代码,如果还有疑问的小伙伴,可以点击传送门再去温习一下。 leader选举源码分析leader发送心跳源码分析 接下来进入正题,今天我们主要是分析上篇文章没有说完的数据同步相关的代码。 Nacos数据的同步 以服务实例的数据同步为例,数据同步的主要核心的地方在于ConsistencySer

日志输出到文件nacos 配置_Nacos 常见问题及解决方法-爱代码爱编程

Nacos 开源至今已有一年,在这一年里,得到了很多用户的支持和反馈。在与社区的交流中,我们发现有一些问题出现的频率比较高,为了能够让用户更快的解决问题,我们总结了这篇常见问题及解决方法,这篇文章后续也会合并到 Nacos 官网的 FAQ 里。 如何依赖最新的 Nacos 客户端? 很多用户都是通过 Spring Cloud Alibaba 或者

Nacos集群搭建+数据持久化-爱代码爱编程

Nacos简介 前四个字母分别为Naming和Configuration的前两个字母,最后的s为Service,一个更易于构建云原生应用的动态服务发现,配置管理和服务管理中心,Nacos:Dynamic Naming and Configuration Service,Nacos就是注册中心+配置中心的组合,等价于Nacos = Eureka+Confi

Nacos 集群部署模式最佳实践-爱代码爱编程

1 前言 Nacos 支持两种部署模式:单机模式和集群模式。在实践中,我们往往习惯用单机模式快速构建一个 Nacos 开发/测试环境,而在生产中,出于高可用的考虑,一定需要使用 Nacos 集群部署模式。我的上一篇文章《一文详解 Nacos 高可用特性》提到了 Nacos 为高可用做了非常多的特性支持,而这些高可用特性大多数都依赖于集群部署模式

soul源码学习(九)-nacos数据同步-爱代码爱编程

文章目录 分析前准备环境配置nacos启动单实例数据同步过程源码分析 分析前准备 环境配置 soul-admin配置soul: database: dialect: mysql init_script: "META-INF/schema.sql" init_enable: true sync: nac

Nacos配置中心数据同步原理-爱代码爱编程

Nacos配置中心数据同步原理 一、Nacos概览 Nacos 能帮助我们发现、配置和管理微服务,提供了一组简单易用的特性集,帮助我们快速实现动态服务发现、服务配置、服务元数据及流量管理。 关键特性 服务发现和服务健康监测动态配置服务动态 DNS 服务服务及其元数据管理持久化 默认情况下,Nacos使用嵌入式数据库实现数据的存储。所以,如果启动多

linux 集群数据同步,NacosSync多集群迁移-爱代码爱编程

NacosSync迁移 环境介绍 80为 A机房,175为 B机房 假设所有服务都连接到B机房175的集群上,现要将B机房的数据同步到A机房80集群上 目前已支持的同步类型:Nacos数据同步到Nacos Zookeeper数据同步到Nacos Nacos数据同步到Zookeeper Eureka数据同步到Nacos Consul数据同

Nacos源码集群数据同步-爱代码爱编程

在DistroConsistencyServiceImpl的put方法中分为两步:   其中的onPut方法已经分析过了。 下面的distroProtocol.sync()就是集群同步的逻辑了。 DistroProtocol类的sync方法如下: public void sync(DistroKey distroKey, DataOp

nacos集群 raft协议下数据同步原理(cp模式)-爱代码爱编程

nacos集群在选举之后,其他节点需要从leader节点同步数据,leader会在心跳间隔时间的时候会给其他节点发送数据,心跳间隔大概几百毫秒。 如果客户端发来数据,nacos集群两阶段提交;首先客户端发送的请求需要转交给leader处理,leader第一步将数据写入到文件中,源码中raftCore中onPublish方法,然后调用write,写入到na

Nacos服务注册-爱代码爱编程

前言 Nacos核心功能点 服务注册:Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。 Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。 服务心跳:在服务注册后,Nacos Client会维护一个定时心跳来持续通知Na

nacos源码系列——第三章(全网最经典的nacos集群源码主线剖析)_nacos里的responsible方法_风清扬逍遥子的博客-爱代码爱编程

        上两个章节讲述了Nacos在单机模式下的服务注册,发现等源码剖析过程,实战当中 其实单机是远远不够的,那么Nacos是如何在集群模式下是如何保证节点状态同步,以及服 务变动,新增数据同步的过程的!         重要几个点:         1、Nacos心跳在集群架构下的设计原理剖析         2、Nac