资讯专栏INFORMATION COLUMN

java并发实战:连接池实现

XboxYan / 714人阅读

摘要:池化技术简介在我们使用数据库的过程中,我们往往使用数据库连接池而不是直接使用数据库连接进行操作,这是因为每一个数据库连接的创建和销毁的代价是昂贵的,而池化技术则预先创建了资源,这些资源是可复用的这样就保证了在多用户情况下只能使用指定数目的资

池化技术简介

在我们使用数据库的过程中,我们往往使用数据库连接池而不是直接使用数据库连接进行操作,这是因为每一个数据库连接的创建和销毁的代价是昂贵的,而池化技术则预先创建了资源,这些资源是可复用的,这样就保证了在多用户情况下只能使用指定数目的资源,避免了一个用户创建一个连接资源,造成程序运行开销过大。

连接池实现原理

这里只实现一个简易的连接池,更多复杂的需求可根据该连接池进行改进,该连接池主要参数如下:

一个繁忙队列busy

一个空闲队列idle

连接池最大活动连接数maxActive

连接池最大等待时间maxWait

连接池的活动连接数activeSize

程序流程图如下:

代码实现

泛型接口ConnectionPool.java

public interface ConnectionPool {

    /**
     * 初始化池资源
     * @param maxActive 池中最大活动连接数
     * @param maxWait 最大等待时间
     */
    void init(Integer maxActive, Long maxWait);

    /**
     * 从池中获取资源
     * @return 连接资源
     */
    T getResource() throws Exception;

    /**
     * 释放连接
     * @param connection 正在使用的连接
     */
    void release(T connection) throws Exception;

    /**
     * 释放连接池资源
     */
    void close();


}

以zookeeper为例,实现zookeeper连接池,ZookeeperConnectionPool.java

public class ZookeeperConnectionPool implements ConnectionPool {
    //最大活动连接数
    private Integer maxActive; 
    //最大等待时间
    private Long maxWait; 
    //空闲队列
    private LinkedBlockingQueue idle = new LinkedBlockingQueue<>();
    //繁忙队列
    private LinkedBlockingQueue busy = new LinkedBlockingQueue<>();
    //连接池活动连接数
    private AtomicInteger activeSize = new AtomicInteger(0);
    //连接池关闭标记
    private AtomicBoolean isClosed = new AtomicBoolean(false);
    //总共获取的连接记数
    private AtomicInteger createCount = new AtomicInteger(0);
    //等待zookeeper客户端创建完成的计数器
    private static ThreadLocal latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1));

    public ZookeeperConnectionPool(Integer maxActive, Long maxWait) {
        this.init(maxActive, maxWait);
    }

    @Override
    public void init(Integer maxActive, Long maxWait) {
        this.maxActive = maxActive;
        this.maxWait = maxWait;
    }

    @Override
    public ZooKeeper getResource() throws Exception {
        ZooKeeper zooKeeper;
        Long nowTime = System.currentTimeMillis();
        final CountDownLatch countDownLatch = latchThreadLocal.get();
        
        //空闲队列idle是否有连接
        if ((zooKeeper = idle.poll()) == null) {
            //判断池中连接数是否小于maxActive
            if (activeSize.get() < maxActive) {
                //先增加池中连接数后判断是否小于等于maxActive
                if (activeSize.incrementAndGet() <= maxActive) {
                    //创建zookeeper连接
                    zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> {
                        if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) {
                            countDownLatch.countDown();
                        }
                    });
                    countDownLatch.await();
                    System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");
                    busy.offer(zooKeeper);
                    return zooKeeper;
                } else {
                    //如增加后发现大于maxActive则减去增加的
                    activeSize.decrementAndGet();
                }
            }
            //若活动线程已满则等待busy队列释放连接
            try {
                System.out.println("Thread:" + Thread.currentThread().getId() + "等待获取空闲资源");
                Long waitTime = maxWait - (System.currentTimeMillis() - nowTime);
                zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new Exception("等待异常");
            }
            //判断是否超时
            if (zooKeeper != null) {
                System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");
                busy.offer(zooKeeper);
                return zooKeeper;
            } else {
                System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");
                throw new Exception("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");
            }
        }
        //空闲队列有连接,直接返回
        busy.offer(zooKeeper);
        return zooKeeper;
    }

    @Override
    public void release(ZooKeeper connection) throws Exception {
        if (connection == null) {
            System.out.println("connection 为空");
            return;
        }
        if (busy.remove(connection)){
            idle.offer(connection);
        } else {
            activeSize.decrementAndGet();
            throw new Exception("释放失败");
        }
    }

    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            idle.forEach((zooKeeper) -> {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            busy.forEach((zooKeeper) -> {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
测试用例

这里创建20个线程并发测试连接池,Test.java

public class Test {

    public static void main(String[] args) throws Exception {
        int threadCount = 20;
        Integer maxActive = 10;
        Long maxWait = 10000L;
        ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait);
        CountDownLatch countDownLatch = new CountDownLatch(20);
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                countDownLatch.countDown();
                try {
                    countDownLatch.await();
                    ZooKeeper zooKeeper = pool.getResource();
                    Thread.sleep(2000);
                    pool.release(zooKeeper);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }).start();
        }
        while (true){

        }
    }
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/73054.html

相关文章

  • 阿里 2021 版最全 Java 并发编程笔记,看完我才懂了“内卷”的真正意义

    摘要:纯分享直接上干货操作系统并发支持进程管理内存管理文件系统系统进程间通信网络通信阻塞队列数组有界队列链表无界队列优先级有限无界队列延时无界队列同步队列队列内存模型线程通信机制内存共享消息传递内存模型顺序一致性指令重排序原则内存语义线程 纯分享 , 直接上干货! 操作系统并发支持 进程管理内存管...

    不知名网友 评论0 收藏0
  • Java深入-框架技巧

    摘要:从使用到原理学习线程池关于线程池的使用,及原理分析分析角度新颖面向切面编程的基本用法基于注解的实现在软件开发中,分散于应用中多出的功能被称为横切关注点如事务安全缓存等。 Java 程序媛手把手教你设计模式中的撩妹神技 -- 上篇 遇一人白首,择一城终老,是多么美好的人生境界,她和他历经风雨慢慢变老,回首走过的点点滴滴,依然清楚的记得当初爱情萌芽的模样…… Java 进阶面试问题列表 -...

    chengtao1633 评论0 收藏0
  • Java多线程编程实战:模拟大量数据同步

    摘要:所以得出结论需要分配较多的线程进行读数据,较少的线程进行写数据。注意多线程编程对实际环境和需求有很大的依赖,需要根据实际的需求情况对各个参数做调整。 背景 最近对于 Java 多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的。否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果。 不过笔者仍旧认为自己对于多线程掌握不够熟练,不敢轻易应用到生产代码中...

    elliott_hu 评论0 收藏0

发表评论

0条评论

XboxYan

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<