zookeeper源码(10)node增删改查及监听

2024-03-15 1094阅读

温馨提示:这篇文章已超过420天没有更新,请注意相关的内容是否还可用!

本文将从leader处理器入手,详细分析node的增删改查流程及监听器原理。

zookeeper源码(10)node增删改查及监听
(图片来源网络,侵删)

回顾数据读写流程

leader

  1. ZookeeperServer.processPacket封装Request并提交给业务处理器
  2. LeaderRequestProcessor做本地事务升级
  3. PrepRequestProcessor做事务准备
  4. ProposalRequestProcessor事务操作发proposal给follower节点,持久化到log文件
  5. CommitProcessor读请求直接转发给下游处理器,事务操作等待到了quorum状态转发给下游处理器
  6. ToBeAppliedRequestProcessor清理toBeApplied集
  7. FinalRequestProcessor将事务写到ZKDatabase中,给客户端发响应

follower

  1. 处理PROPOSAL:使用SyncRequestProcessor处理器持久化,之后SendAckRequestProcessor给leader发ack
  2. 处理COMMIT:提交给CommitProcessor处理器,之后FinalRequestProcessor将事务写到ZKDatabase中

创建node

涉及create、create2、createContainer、createTTL等命令。

PrepRequestProcessor事务准备

反序列化请求参数

switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
    CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
    pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
    break;
case OpCode.createTTL:
    // 默认不支持ttl
    CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
    pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
    break;
// ...

CreateRequest封装创建node的参数:

public class CreateRequest implements Record {
  private String path;
  private byte[] data;
  private java.util.List acl;
  private int flags;
}

CreateTTLRequest封装创建node加ttl的参数:

public class CreateTTLRequest implements Record {
  private String path;
  private byte[] data;
  private java.util.List acl;
  private int flags;
  private long ttl;
}

事务准备

protected void pRequest2Txn(int type, long zxid, Request request, Record record)
        throws KeeperException, IOException, RequestProcessorException {
    // ...
    switch (type) {
      case OpCode.create:
      case OpCode.create2:
      case OpCode.createTTL:
      case OpCode.createContainer: {
        pRequest2TxnCreate(type, request, record);
        break;
      }
    // ...
    }
}
private void pRequest2TxnCreate(
        int type, Request request, Record record) throws IOException, KeeperException {
    int flags;
    String path;
    List acl;
    byte[] data;
    long ttl;
    if (type == OpCode.createTTL) {
        CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
        // 给flags等参数赋值
    } else {
        CreateRequest createRequest = (CreateRequest) record;
        // 给flags等参数赋值
        ttl = -1;
    }
    // CreateMode:
    // PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL,
    // CONTAINER, PERSISTENT_WITH_TTL, PERSISTENT_SEQUENTIAL_WITH_TTL
    CreateMode createMode = CreateMode.fromFlag(flags);
    // 验证临时节点、ttl参数、检查session
    // 默认不支持ttl
    validateCreateRequest(path, createMode, request, ttl);
    String parentPath = validatePathForCreate(path, request.sessionId); // 父节点path
    List listACL = fixupACL(path, request.authInfo, acl); // 请求携带的权限
    ChangeRecord parentRecord = getRecordForPath(parentPath); // 得到父节点
    // 验证CREATE权限
    zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
    int parentCVersion = parentRecord.stat.getCversion();
    if (createMode.isSequential()) { // 顺序节点
        // 例如/users/admin0000000001
        path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
    }
    validatePath(path, request.sessionId);
    // 略
    boolean ephemeralParent = 
        EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
    // 父节点不可以是临时节点
    int newCversion = parentRecord.stat.getCversion() + 1; // 父节点的childVersion++
    // 检查字节限额
    zks.checkQuota(path, null, data, OpCode.create);
    // 不同类型创建对应的Txn对象
    if (type == OpCode.createContainer) {
        request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
    } else if (type == OpCode.createTTL) {
        request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
    } else {
        request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
    }
    TxnHeader hdr = request.getHdr();
    long ephemeralOwner = 0;
    if (createMode.isContainer()) {
        ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
    } else if (createMode.isTTL()) {
        ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
    } else if (createMode.isEphemeral()) {
        ephemeralOwner = request.sessionId; // 临时节点使用sessionId
    }
    // czxid(created),mzxid(modified),ctime,mtime,version,cversion(childVersion),
    // aversion(aclVersion),ephemeralOwner,pzxid(lastModifiedChildren)
    StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
    // 父节点
    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
    parentRecord.childCount++;
    parentRecord.stat.setCversion(newCversion);
    parentRecord.stat.setPzxid(request.getHdr().getZxid());
    parentRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
    addChangeRecord(parentRecord);
    // 新增节点
    ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
    nodeRecord.data = data;
    nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord);
}
protected void addChangeRecord(ChangeRecord c) {
    synchronized (zks.outstandingChanges) {
        zks.outstandingChanges.add(c);
        zks.outstandingChangesForPath.put(c.path, c);
    }
}

outstandingChanges保存未提交的事务变化,比如在生成顺序节点时需要使用cversion值,但是在事务提交到ZKDatabase之前,库里面的值是旧的,所以在上面的代码中,是从outstandingChanges查找节点,给cversion++后再生成顺序节点。

在事务提交之后,才会清理outstandingChanges集。

ProposalRequestProcessor发Proposal

public void processRequest(Request request) throws RequestProcessorException {
    if (request instanceof LearnerSyncRequest) { // sync命令流程,暂不分析
        zks.getLeader().processSync((LearnerSyncRequest) request);
    } else {
        if (shouldForwardToNextProcessor(request)) {
            nextProcessor.processRequest(request); // 提交给下游处理器
        }
        if (request.getHdr() != null) { // 事务操作需要发proposal并写磁盘
            try {
                zks.getLeader().propose(request);
            } catch (XidRolloverException e) {
                throw new RequestProcessorException(e.getMessage(), e);
            }
            // 把事务log写到文件中
            // 之后通过AckRequestProcessor处理器给leader ack
            syncProcessor.processRequest(request);
        }
    }
}

CommitProcessor提交事务

public void processRequest(Request request) {
    request.commitProcQueueStartTime = Time.currentElapsedTime();
    queuedRequests.add(request); // 所有请求队列
    if (needCommit(request)) { // 需要提交的请求进入到写队列
        queuedWriteRequests.add(request);
        numWriteQueuedRequests.incrementAndGet();
    } else {
        numReadQueuedRequests.incrementAndGet();
    }
    wakeup();
}

run方法对比queuedRequests、queuedWriteRequests、committedRequests这几个队列,将提交成功的请求或读请求转发给下游的ToBeAppliedRequestProcessor处理器。

FinalRequestProcessor应用事务

该处理器位于处理器链的末尾,负责将事务应用到ZKDatabase、查询数据、返回响应。

applyRequest

该方法将事务应用到ZKDatabase中:

private ProcessTxnResult applyRequest(Request request) {
    // 应用事务
    ProcessTxnResult rc = zks.processTxn(request);
    // closeSession
    // metrics
    return rc;
}

zks.processTxn负责处理session、处理事务、清理outstandingChanges集。重点看一下处理事务的步骤。

processTxn

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
    ProcessTxnResult rc = new ProcessTxnResult();
    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
        case OpCode.create:
            CreateTxn createTxn = (CreateTxn) txn;
            rc.path = createTxn.getPath();
            createNode(
                createTxn.getPath(),
                createTxn.getData(),
                createTxn.getAcl(),
                createTxn.getEphemeral() ? header.getClientId() : 0,
                createTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                null);
            break;
        case OpCode.create2:
            CreateTxn create2Txn = (CreateTxn) txn;
            rc.path = create2Txn.getPath();
            Stat stat = new Stat();
            createNode(
                create2Txn.getPath(),
                create2Txn.getData(),
                create2Txn.getAcl(),
                create2Txn.getEphemeral() ? header.getClientId() : 0,
                create2Txn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        case OpCode.createTTL:
            CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
            rc.path = createTtlTxn.getPath();
            stat = new Stat();
            createNode(
                createTtlTxn.getPath(),
                createTtlTxn.getData(),
                createTtlTxn.getAcl(),
                EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
                createTtlTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        case OpCode.createContainer:
            CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
            rc.path = createContainerTxn.getPath();
            stat = new Stat();
            createNode(
                createContainerTxn.getPath(),
                createContainerTxn.getData(),
                createContainerTxn.getAcl(),
                EphemeralType.CONTAINER_EPHEMERAL_OWNER,
                createContainerTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        // ...
        }
    }
    // ...
}

createNode

public void createNode(final String path, byte[] data, List acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws NoNodeException, NodeExistsException {
    int lastSlash = path.lastIndexOf('/');
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);
    StatPersisted stat = createStat(zxid, time, ephemeralOwner);
    DataNode parent = nodes.get(parentName); // 父节点必须存在
    if (parent == null) {
        throw new NoNodeException();
    }
    synchronized (parent) {
        Long acls = aclCache.convertAcls(acl);
        Set children = parent.getChildren();
        if (children.contains(childName)) { // 节点不能存在
            throw new NodeExistsException();
        }
        nodes.preChange(parentName, parent);
        if (parentCVersion == -1) { // childVersion++
            parentCVersion = parent.stat.getCversion();
            parentCVersion++;
        }
        if (parentCVersion > parent.stat.getCversion()) {
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
        }
        // 创建node
        DataNode child = new DataNode(data, acls, stat);
        parent.addChild(childName);
        nodes.postChange(parentName, parent);
        nodeDataSize.addAndGet(getNodeSize(path, child.data));
        nodes.put(path, child); // 维护NodeHashMap
        // 处理临时节点
        EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
        if (ephemeralType == EphemeralType.CONTAINER) {
            containers.add(path);
        } else if (ephemeralType == EphemeralType.TTL) {
            ttls.add(path);
        } else if (ephemeralOwner != 0) {
            HashSet list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet());
            synchronized (list) {
                list.add(path);
            }
        }
        // 返回节点stat
        if (outputStat != null) {
            child.copyStat(outputStat);
        }
    }
    // now check if its one of the zookeeper node child 略
    // 触发NodeCreated监听
    dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
    // 触发父节点的NodeChildrenChanged监听
    childWatches.triggerWatch(
        parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}

返回响应

case OpCode.create: {
    lastOp = "CREA";
    rsp = new CreateResponse(rc.path); // 创建Response
    err = Code.get(rc.err); // processTxn的err
    requestPathMetricsCollector.registerRequest(request.type, rc.path);
    break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
    lastOp = "CREA";
    rsp = new Create2Response(rc.path, rc.stat); // 创建Response
    err = Code.get(rc.err); // processTxn的err
    requestPathMetricsCollector.registerRequest(request.type, rc.path);
    break;
}

最后会使用cnxn把响应返回给客户端:

ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
cnxn.sendResponse(hdr, rsp, "response");

EphemeralType

  • VOID
  • NORMAL
  • CONTAINER
  • TTL

    ephemeralOwner标识znode是临时的,以及哪个会话创建了该节点。通过zookeeper.extendedTypesEnabled属性可以启用ttl节点等扩展功能。ephemeralOwner的"特殊位"用于表示启用了哪个功能,而ephemeral Owner的剩余位是特定于功能的。

    当zookeeper.extendedTypesEnabled为true时,将启用扩展类型。扩展ephemeralOwner填充高8位(0xff00000000000000L),高8位之后的两个字节用于表示ephemeralOwner扩展特征,剩余5个字节由该功能指定,可用于所需的任何目的。

    目前,唯一扩展功能是TTL节点,扩展特征值为0。对于TTL节点,ephemeralOwner具有0xff的高8位,接下来2个字节是0,后面的5个字节是以毫秒为单位的ttl值。因此ttl值为1毫秒的ephemeralOwner是0xff00000000000001。

    要添加新的扩展功能:

    • 向枚举添加新名称
    • 在ttl之后,定义常量extended_BIT_xxxx,即0x0001
    • 通过静态初始值设定项向extendedFeatureMap添加映射

      注意:从技术上讲,容器节点也是扩展类型,但由于它是在该功能之前实现的,因此被特别表示。根据定义,只有高位集(0x8000000000000000L)的临时所有者是容器节点(无论是否启用扩展类型)。

      ttl节点

      • 默认不开启,使用
      • Added in 3.5.3
      • 创建PERSISTENT或PERSISTENT_SEQUENTIAL节点时,可以设置以毫秒为单位的ttl。如果znode没有在ttl内修改,并且没有子节点,它将在将来的某个时候成为服务器删除的候选节点

        container节点

        • Added in 3.5.3
        • 当container节点的最后一个子节点被删除时,该container节点将成为服务器在未来某个时候删除的候选节点

          Stat类

          封装节点属性,字段如下:

          • czxid The zxid of the change that caused this znode to be created.
          • mzxid The zxid of the change that last modified this znode.
          • pzxid The zxid of the change that last modified children of this znode.
          • ctime The time in milliseconds from epoch when this znode was created.
          • mtime The time in milliseconds from epoch when this znode was last modified.
          • version The number of changes to the data of this znode.
          • cversion The number of changes to the children of this znode.
          • aversion The number of changes to the ACL of this znode.
          • ephemeralOwner The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
          • dataLength The length of the data field of this znode.
          • numChildren The number of children of this znode.

            删除node

            涉及delete、deleteContainer等命令。

            PrepRequestProcessor事务准备

            反序列化请求参数

            private void pRequestHelper(Request request) {
                try {
                    switch (request.type) {
                    // ...
                    case OpCode.deleteContainer:
                        DeleteContainerRequest deleteContainerRequest =
                            request.readRequestRecord(DeleteContainerRequest::new);
                        pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
                        break;
                    case OpCode.delete:
                        DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
                        pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
                        break;
                    }
                    // ...
                }
            }
            

            DeleteContainerRequest类:

            public class DeleteContainerRequest implements Record {
                private String path;
            }
            

            DeleteRequest类:

            public class DeleteRequest implements Record {
              private String path;
              private int version;
            }
            

            事务准备

            protected void pRequest2Txn(int type, long zxid, Request request,
                         Record record) throws KeeperException, IOException, RequestProcessorException {
                // 略
                switch (type) {
                // 略
                case OpCode.deleteContainer: {
                    DeleteContainerRequest txn = (DeleteContainerRequest) record;
                    String path = txn.getPath();
                    String parentPath = getParentPathAndValidate(path);
                    ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
                    if (nodeRecord.childCount > 0) { // 有子节点不允许删除
                        throw new KeeperException.NotEmptyException(path);
                    }
                    if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL) {
                        throw new KeeperException.BadVersionException(path);
                    }
                    ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
                    request.setTxn(new DeleteTxn(path));
                    // addChangeRecord 略
                    break;
                }
                case OpCode.delete:
                    zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                    DeleteRequest deleteRequest = (DeleteRequest) record;
                    String path = deleteRequest.getPath();
                    String parentPath = getParentPathAndValidate(path);
                    ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
                    // 检查DELETE权限
                    zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
                    ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
                    checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path); // 检查version
                    if (nodeRecord.childCount > 0) { // 有子节点不允许删除
                        throw new KeeperException.NotEmptyException(path);
                    }
                    request.setTxn(new DeleteTxn(path));
                    // addChangeRecord 略
                    break;
                }
            }
            

            FinalRequestProcessor应用事务

            processTxn

            public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
                ProcessTxnResult rc = new ProcessTxnResult();
                try {
                    rc.clientId = header.getClientId();
                    rc.cxid = header.getCxid();
                    rc.zxid = header.getZxid();
                    rc.type = header.getType();
                    rc.err = 0;
                    rc.multiResult = null;
                    switch (header.getType()) {
                    // ...
                    case OpCode.delete:
                    case OpCode.deleteContainer:
                        DeleteTxn deleteTxn = (DeleteTxn) txn;
                        rc.path = deleteTxn.getPath();
                        deleteNode(deleteTxn.getPath(), header.getZxid());
                        break;
                    }
                    // ...
                }
            }
            

            deleteNode

            public void deleteNode(String path, long zxid) throws NoNodeException {
                int lastSlash = path.lastIndexOf('/');
                String parentName = path.substring(0, lastSlash);
                String childName = path.substring(lastSlash + 1);
                DataNode parent = nodes.get(parentName);
                if (parent == null) { // 获取父节点且必须存在
                    throw new NoNodeException();
                }
                synchronized (parent) {
                    nodes.preChange(parentName, parent);
                    parent.removeChild(childName);
                    if (zxid > parent.stat.getPzxid()) {
                        parent.stat.setPzxid(zxid); // The zxid of the change that last modified children of this znode
                    }
                    nodes.postChange(parentName, parent);
                }
                DataNode node = nodes.get(path); // 获取删除节点
                if (node == null) {
                    throw new NoNodeException();
                }
                nodes.remove(path); // 从NodeHashMap删除
                synchronized (node) { // 移除权限
                    aclCache.removeUsage(node.acl);
                    nodeDataSize.addAndGet(-getNodeSize(path, node.data));
                }
                // 移除临时节点、container、ttl等缓存
                synchronized (parent) {
                    long owner = node.stat.getEphemeralOwner();
                    EphemeralType ephemeralType = EphemeralType.get(owner);
                    if (ephemeralType == EphemeralType.CONTAINER) {
                        containers.remove(path);
                    } else if (ephemeralType == EphemeralType.TTL) {
                        ttls.remove(path);
                    } else if (owner != 0) {
                        Set nodes = ephemerals.get(owner);
                        if (nodes != null) {
                            synchronized (nodes) {
                                nodes.remove(path);
                            }
                        }
                    }
                }
                // 略
                // 触发NodeDeleted监听
                WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
                childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
                // 触发父节点的NodeChildrenChanged监听
                childWatches.triggerWatch(
                    "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
            }
            

            设置node数据

            PrepRequestProcessor事务准备

            反序列化请求参数

            private void pRequestHelper(Request request) {
                try {
                    switch (request.type) {
                    // ...
                    case OpCode.setData:
                        SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
                        pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
                        break;
                    // other case
                    }
                }
                // ...
            }
            

            SetDataRequest类:

            public class SetDataRequest implements Record {
              private String path;
              private byte[] data;
              private int version;
            }
            

            事务准备

            protected void pRequest2Txn(int type, long zxid, Request request,
                      Record record) throws KeeperException, IOException, RequestProcessorException {
                // 略
                switch (type) {
                // ...
                case OpCode.setData:
                    zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                    SetDataRequest setDataRequest = (SetDataRequest) record;
                    path = setDataRequest.getPath();
                    validatePath(path, request.sessionId);
                    nodeRecord = getRecordForPath(path); // 获取节点对象
                    // 检查权限
                    zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
                    // 检查字节限额
                    zks.checkQuota(path, nodeRecord.data, setDataRequest.getData(), OpCode.setData);
                    // version++
                    int newVersion = checkAndIncVersion(
                        nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
                    // 创建SetDataTxn
                    request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
                    // addChangeRecord
                    break;
                // other case
                }
            }
            

            FinalRequestProcessor应用事务

            processTxn

            public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
                ProcessTxnResult rc = new ProcessTxnResult();
                try {
                    rc.clientId = header.getClientId();
                    rc.cxid = header.getCxid();
                    rc.zxid = header.getZxid();
                    rc.type = header.getType();
                    rc.err = 0;
                    rc.multiResult = null;
                    switch (header.getType()) {
                    // other case
                    case OpCode.setData:
                        SetDataTxn setDataTxn = (SetDataTxn) txn;
                        rc.path = setDataTxn.getPath();
                        rc.stat = setData(
                            setDataTxn.getPath(),
                            setDataTxn.getData(),
                            setDataTxn.getVersion(),
                            header.getZxid(),
                            header.getTime());
                        break;
                    // other case
                    }
                }
                // ...
            }
            

            setData

            public Stat setData(String path, byte[] data, int version,
                                long zxid, long time) throws NoNodeException {
                Stat s = new Stat();
                DataNode n = nodes.get(path);
                if (n == null) { // 检查节点存在
                    throw new NoNodeException();
                }
                byte[] lastData;
                synchronized (n) {
                    lastData = n.data;
                    nodes.preChange(path, n);
                    n.data = data; // 节点数据
                    n.stat.setMtime(time); // 修改时间
                    n.stat.setMzxid(zxid); // 修改zxid
                    n.stat.setVersion(version); // 版本
                    n.copyStat(s);
                    nodes.postChange(path, n);
                }
                // 略
                // 触发NodeDataChanged监听
                dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
                return s;
            }
            

            查询node数据

            PrepRequestProcessor验证session

            经过该处理器时,只做session验证。

            之后的ProposalRequestProcessor、CommitProcessor、ToBeAppliedRequestProcessor都是直接通过,不做事务处理,直接交给FinalRequestProcessor处理器查询数据、发送响应。

            FinalRequestProcessor查询数据

            使用handleGetDataRequest方法查询数据:

            private Record handleGetDataRequest(
                    Record request, ServerCnxn cnxn, List authInfo) throws KeeperException, IOException {
                GetDataRequest getDataRequest = (GetDataRequest) request;
                String path = getDataRequest.getPath();
                DataNode n = zks.getZKDatabase().getNode(path);
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                // 检查权限
                zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
                Stat stat = new Stat();
                // 查询数据
                // 如果watcher参数不为null会给path添加一个监听器
                byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
                return new GetDataResponse(b, stat);
            }
            

            GetDataRequest类:

            public class GetDataRequest implements Record {
              private String path;
              private boolean watch;
            }
            

            节点监听

            addWatch命令

            case OpCode.addWatch: {
                lastOp = "ADDW";
                AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
                // 最终使用DataTree的addWatch方法注册监听器
                // cnxn是ServerCnxn对象,实现了Watcher接口
                zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
                rsp = new ErrorResponse(0);
                break;
            }
            

            DataTree的addWatch方法

            public void addWatch(String basePath, Watcher watcher, int mode) {
                // PERSISTENT|PERSISTENT_RECURSIVE
                WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
                // dataWatches和childWatches是WatchManager类型对象
                dataWatches.addWatch(basePath, watcher, watcherMode);
                if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
                    childWatches.addWatch(basePath, watcher, watcherMode);
                }
            }
            

            WatcherMode枚举

            public enum WatcherMode {
                STANDARD(false, false),
                PERSISTENT(true, false), // persistent=0
                PERSISTENT_RECURSIVE(true, true), // persistentRecursive=1
                ;
            }
            

            PERSISTENT和PERSISTENT_RECURSIVE是3.6.0版本新增的特性。

            Watcher接口

            实现类需要实现process方法:

            void process(WatchedEvent event);
            

            WatchedEvent代表一个监听事件:

            public class WatchedEvent {
                // 当前zk服务器的状态
                private final KeeperState keeperState;
                // NodeCreated|NodeDeleted|NodeDataChanged|NodeChildrenChanged等
                private final EventType eventType;
                private final String path;
                private final long zxid;
            }
            

            重要的实现类:

            • NIOServerCnxn
            • NettyServerCnxn

              WatchManager类

              This class manages watches. It allows watches to be associated with a string and removes watchers and their watches in addition to managing triggers.

              核心字段:

              // path -> Watcher集
              private final Map watchTable = new HashMap();
              // Watcher -> path->WatchStats(PERSISTENT|STANDARD + PERSISTENT|PERSISTENT_RECURSIVE等)
              private final Map watch2Paths = new HashMap();
              

              WatchStats类

              public final class WatchStats {
                  private static final WatchStats[] WATCH_STATS = new WatchStats[] {
                          new WatchStats(0), // NONE
                          new WatchStats(1), // STANDARD
                          new WatchStats(2), // PERSISTENT
                          new WatchStats(3), // STANDARD + PERSISTENT
                          new WatchStats(4), // PERSISTENT_RECURSIVE
                          new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE
                          new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE
                          new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE
                  };
                  public static final WatchStats NONE = WATCH_STATS[0];
                  private final int flags;
                  private WatchStats(int flags) {
                      this.flags = flags;
                  }
                  private static int modeToFlag(WatcherMode mode) {
                      // mode = STANDARD; return 1 
                      int flags = this.flags | modeToFlag(mode); // |计算保留多种状态
                      return WATCH_STATS[flags];
                  }
                  public WatchStats removeMode(WatcherMode mode) {
                      int mask = ~modeToFlag(mode); // 取反
                      int flags = this.flags & mask;
                      if (flags == 0) {
                          return NONE;
                      }
                      return WATCH_STATS[flags];
                  }
                  // ...
              }
              
                  Set
                      list = new HashSet
                      // cnxns typically have many watches, so use default cap here
                      paths = new HashMap
                      paths.put(path, newStats);
                      if (watcherMode.isRecursive()) {
                          ++recursiveWatchQty;
                      }
                      return true;
                  }
                  return false;
              }
              
                  WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
                  Set
                      // path迭代器,从子节点path向前遍历
                      // 例如/apps/app1/name
                      // next = /apps/app1/name, next = /apps/app1, next = /apps ...
                      PathParentIterator pathParentIterator = getPathParentIterator(path);
                      for (String localPath : pathParentIterator.asIterable()) {
                          // 获取遍历Watcher集
                          Set
                              Watcher watcher = iterator.next();
                              // 获取watcher对应的WatchStats
                              Map
                                  watchers.add(watcher); // 加入watchers中
                                  WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
                                  if (newStats == WatchStats.NONE) { // STANDARD模式下会移除监听器
                                      iterator.remove();
                                      paths.remove(localPath);
                                  } else if (newStats != stats) {
                                      paths.put(localPath, newStats);
                                  }
                              } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
                                  // 递归模式下才将父节点加入watchers中
                                  watchers.add(watcher);
                              }
                          }
                          if (thisWatchers.isEmpty()) {
                              watchTable.remove(localPath);
                          }
                      }
                  }
                  // 略
                  for (Watcher w : watchers) {
                      if (supress != null && supress.contains(w)) {
                          continue;
                      }
                      w.process(e);
                  }
                  // 略
                  return new WatcherOrBitSet(watchers);
              }
              
                  ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
                  // 转型成WatcherEvent才能通过网络传输
                  WatcherEvent e = event.getWrapper();
                  // 把事件推送给客户端
                  int responseSize = sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
                  ServerMetrics.getMetrics().WATCH_BYTES.add(responseSize);
              }
              
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]