资讯专栏INFORMATION COLUMN

中间件 - ZooKeeper应用场景实践

XFLY / 2366人阅读

摘要:三分布式锁这部分是重要功能,在此基础上实现诸如,分布式协调通知,负载均衡,选举等复杂场景。针对此情况,改进后判断读写顺序为创建完临时顺序节点后,获取下的所有子节点。

注:该文章用作回顾记录

一、准备工作

预先下载安装 ZooKeeper ,简单配置就能使用了。然后构建 Maven 项目,将下面的代码粘贴到 pom.xml中:

      
        org.apache.zookeeper  
        zookeeper  
        3.4.5  
      
      
        com.101tec  
        zkclient  
        0.5  
     

zkclient 是开源的客户端工具,其中封装了很多功能,比如:删除包含子节点的父节点,连接重试,异步回调,偏向 Java 写法的注册监听等,极大地方便了用户使用。

下面不过多介绍客户端操作,只针对应用场景做介绍,该文章会随着本人的学习持续补充。

二、数据发布/订阅

使用 ZooKeeper 节点监听来实现该功能:

ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); // 连接集群

zkClient.createPersistent("/xxx/xxx"); // 创建持久节点

// 注册子节点变更监听,当子节点改变(比如创建了"/xxx/xxx/1")或当前节点删除等,会触发异步回调
zkClient.subscribeChildChanges("/xxx/xxx", new IZkChildListener() {
    @Override
    public void handleChildChange(String parentPath, List currentChilds) 
        throws Exception {
    }
});

下面为部分源码:

package org.I0Itec.zkclient;

public class ZkClient implements Watcher {

    public List watchForChilds(final String path) {
        return retryUntilConnected(new Callable>() {
            @Override
            public List call() throws Exception {
                exists(path, true);
                try {
                    return getChildren(path, true);
                } catch (ZkNoNodeException e) {
                }
                return null;
            }
        });
    }
    
    public  T retryUntilConnected(Callable callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
        final long operationStartTime = System.currentTimeMillis();
        while (true) {
            if (_closed) {
                throw new IllegalStateException("ZkClient already closed!");
            }
            try {
                return callable.call();
            } catch (Exception e) {
                throw ExceptionUtil.convertToRuntimeException(e);
            }
        }
    }
}

基于 ZooKeeper 实现的数据发布/订阅很简单吧,快动手试试。

三、分布式锁

这部分是 ZooKeeper 重要功能,在此基础上实现诸如,分布式协调/通知,负载均衡,Master选举等复杂场景。

1、排它锁

排它锁又称为写锁或独占锁。比如事务 T1 对数据对象 O1 加了排它锁,那么在整个加锁期间,只允许 T1 对 O1 进行读取或更新操作,其它事务都不能对 O1 操作。

1)获取锁

所有客户端都创建临时节点 zkClient.createEphemeral("/xxx/xxx", null);,ZooKeeper 会保证在所有客户端中,最终只有一个客户端能创建成功,那么就认为该客户端获取了锁。同时,所有没获取到锁的客户端需在/xxx/xxx 上注册子节点变更监听,以便实时监听节点变化。如节点发生变化,则未获取到锁的客户端再重新获取锁。

private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
private static final String lockParentPath = "/zk-book/exclusice_lock";

public static void main(String[] args) throws InterruptedException {
    try {
        zkClient.createEphemeral(lockParentPath + "/lock");
        System.out.println("service3 获取锁成功");
    } catch (Exception e) {
        System.out.println("service3获取锁失败");
        zkClient.subscribeChildChanges(lockParentPath, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List currentChilds)
                    throws Exception {
                System.out.println("service3再次获取锁");
                main(null);
            }
        });
    }
    Thread.sleep(Integer.MAX_VALUE);
}

2)释放锁

"/xxx/xxx" 是临时节点时,以下俩种情况都会释放锁。

当已获取锁的客户机宕机,导致连接超时断开,那么 ZooKeeper 会将临时节点删除。

正常执行完逻辑后,客户端主动将临时节点删除。

2、共享锁

共享锁又称为读锁。如果事务 T1 对数据对象 O1 加了共享锁,那么 T1 只能对 O1 进行读取操作,其它事务只能对 O1 加共享锁,直到 O1 上所有共享锁都被释放。

1)获取锁

所有客户端都创建临时顺序节点 zkClient.createEphemeralSequential("/xxx/xxx", null);,ZooKeeper 会生成类似下面的节点,已保证节点的唯一性。

2)判断读写顺序

创建完临时顺序节点后,获取 "/xxx" 下的所有子节点,并对该节点注册子节点变更监听。

确定创建完的临时顺序节点在所有节点中的顺序。

对于读节点:
没有比自己序号小的节点,或比自己序号小的节点都是读节点,则成功获取到共享锁。
如果比自己序号小的节点中存在写节点,则需进入等待。
对于写节点:
如果自己不是序号最小的节点,则需进入等待。

接受到子节点变更通知后,重复步骤1

以下为实现代码:

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分布式共享锁
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLock {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static volatile boolean isExecuted = false;
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        String nodeTemp = zkClient.createEphemeralSequential(PARENT_PATH + "/w-", null);
        String node = nodeTemp.substring(nodeTemp.lastIndexOf("/") + 1);
        
        List currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        if (currentChilds.size() > 0)
            isExecuted = getLockAndExecute(currentChilds, node);
        
        zkClient.subscribeChildChanges(PARENT_PATH, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List currentChilds)
                    throws Exception {
                if (currentChilds.size() > 0) {
                    currentChilds = sortNodes(currentChilds);
                    isExecuted = getLockAndExecute(currentChilds, node);
                }
            }
        });
        
        while (!isExecuted) {
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序节点
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List sortNodes(List nodes) {
        Collections.sort(nodes, new Comparator() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比较序列号
            }
        });
        return nodes;
    }
    
    /**
     * 获取节点位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @return
     */
    private static Integer getNodePosition(List nodes, String node) {
        for (int i = 0, size = nodes.size(); i < size; i++) {
            if (nodes.get(i).equals(node))
                return i;
        }
        return null; // 无此数据
    }
    
    /**
     * 是否得到锁
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @param nodePosition
     * @return
     */
    private static boolean isGetLock(List nodes, String node, int nodePosition) {
        if (nodePosition == 0) // 没有比此序号更小的节点 
            return true;
        if (node.indexOf("r-") > -1) { // 读节点
            for (int i = 0; i < nodePosition; i++) { // 遍历小于次序号的节点
                String nodeTemp = nodes.get(i);
                if (nodeTemp.indexOf("w-") > -1)  // 存在写节点,则进入等待锁
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 获取锁并执行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLockAndExecute(List currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null) // 子节点为空
            return false;
        System.out.println("子节点:" + currentChilds.toString() + ", " + node + " 的位置:" + nodePosition);
        boolean isGetLock = isGetLock(currentChilds, node, nodePosition);
        if (isGetLock) {
            System.out.println(node + " 成功获取到锁,开始执行耗时任务");
            doSomething();
            boolean isSuccess = zkClient.delete(PARENT_PATH + "/" + node);
            if (isSuccess)
                System.out.println(node + " 成功执行完任务并删除节点");
        } else {
            System.out.println(node + " 未获取到锁");
        }
        return isGetLock;
    }
    
    private static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

测试以上代码会发现,当获取锁的节点过多时,某一节点变更会通知所有节点,会对 ZooKeeper 服务器造成巨大的性能影响和网络冲击,服务器会发送给客户端大量的事件通知。比如有以下节点,当 w-24 节点变更时,会通知给其余节点。

因为当获取共享锁时,要判断比自己序号小的节点,所以应该只给 r-25 节点发送通知。针对此情况,改进后判断读写顺序为:

创建完临时顺序节点后,获取 "/xxx" 下的所有子节点。

客户端调用 getChildren() 来获取子节点列表,注意,这里不注册任何监听。

如果未获取到共享锁,那么找到比自己序号小的节点来注册监听,分为以下俩种情况:
读节点:比自己序号小的最后一个写节点注册监听
写节点:比自己序号小的最后一个节点注册监听

等待监听通知,重复步骤2

改进后的共享锁代码实现:

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分布式共享锁最优
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLockOptimal {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static String nodeFullPath = zkClient.createEphemeralSequential(PARENT_PATH + "/r-", null);
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        List currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        String node = nodeFullPath.substring(nodeFullPath.lastIndexOf("/") + 1);
        
        boolean isReadNode = node.indexOf("r-") > -1 ? true : false, isGetLock = getLock(currentChilds, node);
        System.out.println("当前所有节点:" + currentChilds.toString() + ", 该" + (isReadNode ? "读" : "写") + "节点:" + node);
        
        if (isGetLock) {
            execute(node);
            System.out.println("退出程序");
            System.exit(1);
        } else {
            String monitorNode = getMonitorNode(currentChilds, node);
            System.out.println(node + " 未获取到锁,注册监听节点:" + monitorNode);
            if (null != monitorNode) {
                zkClient.subscribeChildChanges(PARENT_PATH + "/" + monitorNode, new IZkChildListener() {
                    @Override
                    public void handleChildChange(String parentPath, List currentChilds)
                            throws Exception {
                        main(null); // 递归调用
                    }
                });
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序节点
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List sortNodes(List nodes) {
        Collections.sort(nodes, new Comparator() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比较序列号
            }
        });
        return nodes;
    }
    
    /**
     * 获取节点位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static Integer getNodePosition(List currentChilds, String node) {
        for (int i = 0, size = currentChilds.size(); i < size; i++) {
            if (currentChilds.get(i).equals(node))
                return i;
        }
        return null;
    }
    
    /**
     * 获取监听节点
     * @author alexnevsky
     * @date 2018年5月25日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static String getMonitorNode(List currentChilds, String node) {
        String monitorNode = null;
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (0 < nodePosition) { // 非首节点
            if (node.indexOf("r-") > -1) { // 读节点
                // 获取比当前序号小的最后一个写节点
                for (int i = nodePosition - 1; i >= 0; i--) {
                    String tempNode = currentChilds.get(i);
                    if (tempNode.indexOf("w-") > -1) 
                        return tempNode;
                }
            } else {
                // 获取比当前序号小的最后一个节点
                return currentChilds.get(nodePosition - 1);
            }
        }
        return monitorNode;
    }
    
    /**
     * 获取锁
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLock(List currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null)
            return false;
        if (nodePosition == 0) // 无序号更小的节点 
            return true;
        if (node.indexOf("r-") > -1) { // 读节点
            for (int i = 0; i < nodePosition; i++) { // 遍历前面序号的节点
                String tempNode = currentChilds.get(i);
                if (tempNode.indexOf("w-") > -1)  // 存在写节点,返回失败
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 执行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param node
     * @return
     */
    private static void execute(String node) {
        System.out.println(node + " 成功获取到锁,开始执行耗时任务");
        doSomething();
        boolean isDeletedLock = zkClient.delete(nodeFullPath);
        System.out.println(node + " 成功执行完任务,删除节点" + (isDeletedLock ? "成功" : "失败"));
    }
    
    /**
     * 模拟耗时任务
     * @author alexnevsky
     * @date 2018年5月25日
     */
    public static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69512.html

相关文章

  • Dubbo Cloud Native 之路的实践与思考

    摘要:可简单地认为它是的扩展,负载均衡自然成为不可或缺的特性。是基于开发的服务代理组件,在使用场景中,它与和整合,打造具备服务动态更新和负载均衡能力的服务网关。类似的特性在项目也有体现,它是另一种高性能代理的方案,提供服务发现健康和负载均衡。 摘要: Cloud Native 应用架构随着云技术的发展受到业界特别重视和关注,尤其是 CNCF(Cloud Native Computing Fo...

    niceforbear 评论0 收藏0
  • 【推荐】最新200篇:技术文章整理

    摘要:作为面试官,我是如何甄别应聘者的包装程度语言和等其他语言的对比分析和主从复制的原理详解和持久化的原理是什么面试中经常被问到的持久化与恢复实现故障恢复自动化详解哨兵技术查漏补缺最易错过的技术要点大扫盲意外宕机不难解决,但你真的懂数据恢复吗每秒 作为面试官,我是如何甄别应聘者的包装程度Go语言和Java、python等其他语言的对比分析 Redis和MySQL Redis:主从复制的原理详...

    BicycleWarrior 评论0 收藏0
  • 【推荐】最新200篇:技术文章整理

    摘要:作为面试官,我是如何甄别应聘者的包装程度语言和等其他语言的对比分析和主从复制的原理详解和持久化的原理是什么面试中经常被问到的持久化与恢复实现故障恢复自动化详解哨兵技术查漏补缺最易错过的技术要点大扫盲意外宕机不难解决,但你真的懂数据恢复吗每秒 作为面试官,我是如何甄别应聘者的包装程度Go语言和Java、python等其他语言的对比分析 Redis和MySQL Redis:主从复制的原理详...

    tommego 评论0 收藏0
  • Dubbo Cloud Native 实践与思考

    摘要:可简单地认为它是的扩展,负载均衡自然成为不可或缺的特性。类似的特性在项目也有体现,它是另一种高性能代理的方案,提供服务发现健康和负载均衡。 Dubbo Cloud Native 实践与思考 分享简介 Cloud Native 应用架构随着云技术的发展受到业界特别重视和关注,尤其是 CNCF(Cloud Native Computing Foundation)项目蓬勃发展之际。Dubbo...

    邱勇 评论0 收藏0

发表评论

0条评论

XFLY

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<