资讯专栏INFORMATION COLUMN

实战java高并发程序设计第三章(二)

Sike / 925人阅读

摘要:的并发容器并发集合这是一个高效的并发你可以把它理解为一个线程安全的。可以看作一个线程安全的这是一个接口,内部通过链表数组等方式实现了这个接口。

3. JDK的并发容器

并发集合
ConcurrentHashMap:这是一个高效的并发HashMap.你可以把它理解为一个线程安全的HashMap。
CopyOnWriteArrayList:这是一个List,从名字看就知道它和ArrayList是一族的。在读多写少的场合,这个List的性能非常好,远远优于Vector。
ConcurrentLinkedQueue:高效的并发队列,使用链表实现。可以看作一个线程安全的LinkedList.
BlockingQueue:这是一个接口,JDK内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合作为数据共享的通道。
ConcurrentSkipListMap:跳表的实现。这是一个Map,使用跳表的数据结构进行快速查找。

线程安全的HashMap
可用Collections类来使普通HashMap转为线程安全的map

Collections.synchronizedMap(new HashMap())
    private static class SynchronizedMap
        implements Map, Serializable {
        private static final long serialVersionUID = 1978198479659022715L;
        private final Map m;     // 传入的map
        final Object      mutex;        // 锁资源对象,对map的任何操作都会锁该对象
        SynchronizedMap(Map m) {
            this.m = Objects.requireNonNull(m);
            mutex = this;
        }
        SynchronizedMap(Map m, Object mutex) {
            this.m = m;
            this.mutex = mutex;
        }
        public int size() {
            synchronized (mutex) {return m.size();}
        }
        public boolean isEmpty() {
            synchronized (mutex) {return m.isEmpty();}
        }
        public boolean containsKey(Object key) {
            synchronized (mutex) {return m.containsKey(key);}
        }
        public boolean containsValue(Object value) {
            synchronized (mutex) {return m.containsValue(value);}
        }
        public V get(Object key) {
            synchronized (mutex) {return m.get(key);}
        }

        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);}
        }
        public V remove(Object key) {
            synchronized (mutex) {return m.remove(key);}
        }
        public void putAll(Map map) {
            synchronized (mutex) {m.putAll(map);}
        }
        public void clear() {
            synchronized (mutex) {m.clear();}
        }
        .......  //省略
  }

List的线程安全

Collections.synchronizedList(new LinkedList())

高效读写队列ConcurrentLinkedQueue类

高并发环境中性能最好的队列,主要是利用CAS进行无锁操作,非阻塞队列
首先我们来看下它的Node节点:

    private static class Node {
        volatile E item;            //当前对象
        volatile Node next;      //下一个对象,以此来构建链表
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {        //(期望值,设置目标值),cas操作
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node cmp, Node val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

ConcurrentLinkedQueue类内部的tail指针更新并不是实时的,可能存在拖延现象,每次更新跳跃两个元素,如下图:

然后再看一下新增节点offer()方法:

public boolean offer(E e) {
        checkNotNull(e);        //非空校验
        final Node newNode = new Node(e);
        for (Node t = tail, p = t;;) {    //for循环 无出口,知道设置成功
            Node q = p.next;        //获取tail节点的next对象
            if (q == null) {        //第一次插入,p.next对象为空
                // p 为最后一个节点
                if (p.casNext(null, newNode)) {     //插入新元素,此时p=t
                    //每两次更新tail
                    if (p != t)         
                        casTail(t, newNode);  
                    return true;
                }
                // cas竞争失败,再次循环
            }
            else if (p == q)    //遇到哨兵
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q; //t!=(t=tail) !=并不是原子操作,先取左边t的值,再取右边t=tail
        }
    }

高效读取:不变模式下的CopyOnWriteArrayList类

使用场景:读操作远远大于写操作,读操作越快越好,写操作慢一些也没事
特点:读取不用加锁,写入不会阻塞读取操作,只有写入和写入需要同步等待,读性能大幅提升
原理:写入时进行一次自我复制,修改内容写入副本中,写完后,再用副本内容替代原来的数据

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);  //进行复制
            newElements[len] = e;    //新数组代替老数组
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

数据共享通道:BlockingQueue

BlockingQueue是接口,实现类有ArrayBlockingQueue以及LinkedBlockingQueue.当BlockingQueue为空时,会等待,当有消息进入队列后,自动唤醒线程,Condition.await()和Condition.signal(),祥见上一篇 Condition重入锁

注意:
一般生产者消费者模型中,往往采用BlockingQueue而不是ConcurrentLinkedQueeu,因为BlockingQueue带有阻塞功能,可以控制生产消费者的速率(await和signal)

随机数据结构:跳表

使用环境:高并发环境
特点:快速查找,类似平衡树,平衡树插入和删除往往会引起一次全局调整,而跳表只需局部调整,且在高并发环境下,平衡树需要全局锁,而跳表只需要局部;随机算法,跳表的本质是维护多个链表;有序性
原理:如下

4. JMH性能测试

用于测试方法的执行效率,精度达毫秒级.

maven:

        
            org.openjdk.jmh
            jmh-core
            1.18
        
        
            org.openjdk.jmh
            jmh-generator-annprocess
            1.18
            provided
        

基本概念

1.模式(Model):
model表示JMH的测量方式和角度,共四种
Throughput:整体吞吐量,一秒可执行多少次
AverageTime:调用平均时间
SampleTime:随机取样,最后输出取样结果,如"99%的调用在xxx毫秒内"
SingleShotTime:只运行一次,无warmup(热身),用于测试启动时的性能

2.迭代(Iteration)
迭代表示一次测试单位,一般为1秒

3.预热(warmup)
预热是为了测试在JIT编译后的性能

4.状态(State)
指测试范围,一种是线程范,一个线程一个对象.另外一种是基准测试范围(Benchmark),多个线程共享一个实例

5.配置类(Options)
指定一些参数,如指定测试类(include),使用进程个数(fork),预热迭代次数(warmuoIterations)

代码

@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
public class ListTest {
    CopyOnWriteArrayList smallCopyOnWriteList = new CopyOnWriteArrayList();
    ConcurrentLinkedQueue smallConcurrentList = new ConcurrentLinkedQueue();
    CopyOnWriteArrayList bigCopyOnWriteList = new CopyOnWriteArrayList();
    ConcurrentLinkedQueue bigConcurrentList = new ConcurrentLinkedQueue();
    @Setup
    public void setup() {
        for (int i = 0; i < 10; i++) {
            smallCopyOnWriteList.add(new Object());
            smallConcurrentList.add(new Object());
        }

        for (int i = 0; i < 1000; i++) {
            bigCopyOnWriteList.add(new Object());
            bigCopyOnWriteList.add(new Object());
        }
    }
    @Benchmark
    public void copyOnWriteGet() {
        smallCopyOnWriteList.get(0);
    }
    @Benchmark
    public void copyOnWriteSize() {
        smallCopyOnWriteList.size();
    }
    @Benchmark
    public void concurrentListGet() {
        smallConcurrentList.peek();
    }
    @Benchmark
    public void concurrentListSize() {
        smallConcurrentList.size();
    }
    @Benchmark
    public void smallCopyOnWriteWrite() {
        smallCopyOnWriteList.add(new Object());
        smallCopyOnWriteList.remove(0);
    }
    @Benchmark
    public void smallConcurrentListWrite() {
        smallConcurrentList.add(new Object());
        smallConcurrentList.remove(0);
    }
    @Benchmark
    public void bigCopyOnWriteWrite() {
        bigCopyOnWriteList.add(new Object());
        bigCopyOnWriteList.remove(0);
    }
    @Benchmark
    public void bigConcurrentListWrite() {
        bigConcurrentList.offer(new Object());
        bigConcurrentList.remove(0);
    }
    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder().include(ListTest.class.getSimpleName()).forks(1).warmupIterations(5)
                .measurementIterations(5).threads(4).build();
        new Runner(opt).run();
    }
}

性能思考

hashmap和concurrenthashmap的对比

单线程下,hashmap的get方法比concurrenthashmap略慢,size()方法却快得多,同步hashmap,size方法仅比concurrenthashmap略快一点

CopyOnWriteArrayList类与ConcurrentLinkedQueue类

当元素总量不大时,绝大部分场景中CopyOnWriteArrayList性能要优于ConcurrentLinkedQueue

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76116.html

相关文章

  • 实战java并发程序设计第三(一)

    摘要:只要线程池未关闭该策略直接在调用者线程中运行当前被丢弃的任务。显然这样做不会真的丢弃任务但是任务提交线程的性能极有可能会急剧下降。任务并尝试再次提交当前任务。 1. 同步控制 synchronized的扩展:重入锁 同步控制不仅有synchronized配合object.wait()以及object.notify(),也有增强版的reentrantLock(重入锁) public cl...

    joyvw 评论0 收藏0
  • 从小白程序员一路晋升为大厂级技术专家我看过哪些书籍?(建议收藏)

    摘要:大家好,我是冰河有句话叫做投资啥都不如投资自己的回报率高。马上就十一国庆假期了,给小伙伴们分享下,从小白程序员到大厂高级技术专家我看过哪些技术类书籍。 大家好,我是...

    sf_wangchong 评论0 收藏0
  • 实战Java并发程序设计5】让普通变量也享受原子操作

    摘要:有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求。它可以让你在不改动或者极少改动原有代码的基础上,让普通的变量也享受操作带来的线程安全性,这样你可以修改极少的代码,来获得线程安全的保证。 有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求。如果改动不大,我们可以简单地修改程序中每一个使用或者读取这个变量的地方。但显然,这...

    appetizerio 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<