资讯专栏INFORMATION COLUMN

长文慎入-探索Java并发编程与高并发解决方案

SimpleTriangle / 2122人阅读

摘要:所有示例代码请见下载于基本概念并发同时拥有两个或者多个线程,如果程序在单核处理器上运行多个线程将交替地换入或者换出内存这些线程是同时存在的,每个线程都处于执行过程中的某个状态,如果运行在多核处理器上此时,程序中的每个线程都

所有示例代码,请见/下载于

https://github.com/Wasabi1234...



1 基本概念 1.1 并发

同时拥有两个或者多个线程,如果程序在单核处理器上运行多个线程将交替地换入或者换出内存,这些线程是同时“存在"的,每个线程都处于执行过程中的某个状态,如果运行在多核处理器上,此时,程序中的每个线程都将分配到一个处理器核上,因此可以同时运行.

1.2 高并发( High Concurrency)

互联网分布式系统架构设计中必须考虑的因素之一,通常是指,通过设计保证系统能够同时并行处理很多请求.

1.3 区别与联系

并发: 多个线程操作相同的资源,保证线程安全,合理使用资源

高并发:服务能同时处理很多请求,提高程序性能

2 CPU 2.1 CPU 多级缓存

为什么需要CPU cache

CPU的频率太快了,快到主存跟不上
如此,在处理器时钟周期内,CPU常常需要等待主存,浪费资源。所以cache的出现,是为了缓解CPU和内存之间速度的不匹配问题(结构:cpu-> cache-> memory ).

CPU cache的意义
1) 时间局部性
如果某个数据被访问,那么在不久的将来它很可能被再次访问
2) 空间局部性

如果某个数据被访问,那么与它相邻的数据很快也可能被访问

2.2 缓存一致性(MESI)

用于保证多个 CPU cache 之间缓存共享数据的一致

M-modified被修改

该缓存行只被缓存在该 CPU 的缓存中,并且是被修改过的,与主存中数据是不一致的,需在未来某个时间点写回主存,该时间是允许在其他CPU 读取主存中相应的内存之前,当这里的值被写入主存之后,该缓存行状态变为 E

E-exclusive独享

缓存行只被缓存在该 CPU 的缓存中,未被修改过,与主存中数据一致
可在任何时刻当被其他 CPU读取该内存时变成 S 态,被修改时变为 M态

S-shared共享

该缓存行可被多个 CPU 缓存,与主存中数据一致

I-invalid无效

乱序执行优化

处理器为提高运算速度而做出违背代码原有顺序的优化

并发的优势与风险

3 项目准备 3.1 项目初始化



3.2 并发模拟-Jmeter压测





3.3 并发模拟-代码 CountDownLatch

Semaphore(信号量)


以上二者通常和线程池搭配

下面开始做并发模拟

</>复制代码

  1. package com.mmall.concurrency;
  2. import com.mmall.concurrency.annoations.NotThreadSafe;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.Semaphore;
  8. /**
  9. * @author shishusheng
  10. * @date 18/4/1
  11. */
  12. @Slf4j
  13. @NotThreadSafe
  14. public class ConcurrencyTest {
  15. /**
  16. * 请求总数
  17. */
  18. public static int clientTotal = 5000;
  19. /**
  20. * 同时并发执行的线程数
  21. */
  22. public static int threadTotal = 200;
  23. public static int count = 0;
  24. public static void main(String[] args) throws Exception {
  25. //定义线程池
  26. ExecutorService executorService = Executors.newCachedThreadPool();
  27. //定义信号量,给出允许并发的线程数目
  28. final Semaphore semaphore = new Semaphore(threadTotal);
  29. //统计计数结果
  30. final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
  31. //将请求放入线程池
  32. for (int i = 0; i < clientTotal ; i++) {
  33. executorService.execute(() -> {
  34. try {
  35. //信号量的获取
  36. semaphore.acquire();
  37. add();
  38. //释放
  39. semaphore.release();
  40. } catch (Exception e) {
  41. log.error("exception", e);
  42. }
  43. countDownLatch.countDown();
  44. });
  45. }
  46. countDownLatch.await();
  47. //关闭线程池
  48. executorService.shutdown();
  49. log.info("count:{}", count);
  50. }
  51. /**
  52. * 统计方法
  53. */
  54. private static void add() {
  55. count++;
  56. }
  57. }

运行发现结果随机,所以非线程安全

4线程安全性 4.1 线程安全性

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的

4.2 原子性 4.2.1 Atomic 包

AtomicXXX:CAS,Unsafe.compareAndSwapInt

提供了互斥访问,同一时刻只能有一个线程来对它进行操作

</>复制代码

  1. package com.mmall.concurrency.example.atomic;
  2. import com.mmall.concurrency.annoations.ThreadSafe;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.Semaphore;
  8. import java.util.concurrent.atomic.AtomicLong;
  9. /**
  10. * @author shishusheng
  11. */
  12. @Slf4j
  13. @ThreadSafe
  14. public class AtomicExample2 {
  15. /**
  16. * 请求总数
  17. */
  18. public static int clientTotal = 5000;
  19. /**
  20. * 同时并发执行的线程数
  21. */
  22. public static int threadTotal = 200;
  23. /**
  24. * 工作内存
  25. */
  26. public static AtomicLong count = new AtomicLong(0);
  27. public static void main(String[] args) throws Exception {
  28. ExecutorService executorService = Executors.newCachedThreadPool();
  29. final Semaphore semaphore = new Semaphore(threadTotal);
  30. final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
  31. for (int i = 0; i < clientTotal ; i++) {
  32. executorService.execute(() -> {
  33. try {
  34. System.out.println();
  35. semaphore.acquire();
  36. add();
  37. semaphore.release();
  38. } catch (Exception e) {
  39. log.error("exception", e);
  40. }
  41. countDownLatch.countDown();
  42. });
  43. }
  44. countDownLatch.await();
  45. executorService.shutdown();
  46. //主内存
  47. log.info("count:{}", count.get());
  48. }
  49. private static void add() {
  50. count.incrementAndGet();
  51. // count.getAndIncrement();
  52. }
  53. }

</>复制代码

  1. package com.mmall.concurrency.example.atomic;
  2. import com.mmall.concurrency.annoations.ThreadSafe;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.util.concurrent.atomic.AtomicReference;
  5. /**
  6. * @author shishusheng
  7. * @date 18/4/3
  8. */
  9. @Slf4j
  10. @ThreadSafe
  11. public class AtomicExample4 {
  12. private static AtomicReference count = new AtomicReference<>(0);
  13. public static void main(String[] args) {
  14. // 2
  15. count.compareAndSet(0, 2);
  16. // no
  17. count.compareAndSet(0, 1);
  18. // no
  19. count.compareAndSet(1, 3);
  20. // 4
  21. count.compareAndSet(2, 4);
  22. // no
  23. count.compareAndSet(3, 5);
  24. log.info("count:{}", count.get());
  25. }
  26. }

AtomicReference,AtomicReferenceFieldUpdater

AtomicBoolean

AtomicStampReference : CAS的 ABA 问题

4.2.2 锁

synchronized:依赖 JVM

修饰代码块:大括号括起来的代码,作用于调用的对象

修饰方法: 整个方法,作用于调用的对象

修饰静态方法:整个静态方法,作用于所有对象

</>复制代码

  1. package com.mmall.concurrency.example.count;
  2. import com.mmall.concurrency.annoations.ThreadSafe;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.Semaphore;
  8. /**
  9. * @author shishusheng
  10. */
  11. @Slf4j
  12. @ThreadSafe
  13. public class CountExample3 {
  14. /**
  15. * 请求总数
  16. */
  17. public static int clientTotal = 5000;
  18. /**
  19. * 同时并发执行的线程数
  20. */
  21. public static int threadTotal = 200;
  22. public static int count = 0;
  23. public static void main(String[] args) throws Exception {
  24. ExecutorService executorService = Executors.newCachedThreadPool();
  25. final Semaphore semaphore = new Semaphore(threadTotal);
  26. final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
  27. for (int i = 0; i < clientTotal ; i++) {
  28. executorService.execute(() -> {
  29. try {
  30. semaphore.acquire();
  31. add();
  32. semaphore.release();
  33. } catch (Exception e) {
  34. log.error("exception", e);
  35. }
  36. countDownLatch.countDown();
  37. });
  38. }
  39. countDownLatch.await();
  40. executorService.shutdown();
  41. log.info("count:{}", count);
  42. }
  43. private synchronized static void add() {
  44. count++;
  45. }
  46. }

synchronized 修正计数类方法

修饰类:括号括起来的部分,作用于所有对象

子类继承父类的被 synchronized 修饰方法时,是没有 synchronized 修饰的!!!

Lock: 依赖特殊的 CPU 指令,代码实现

4.2.3 对比

synchronized: 不可中断锁,适合竞争不激烈,可读性好

Lock: 可中断锁,多样化同步,竞争激烈时能维持常态

Atomic: 竞争激烈时能维持常态,比Lock性能好; 只能同步一

个值

4.3 可见性

一个线程对主内存的修改可以及时的被其他线程观察到

4.3.1 导致共享变量在线程间不可见的原因

线程交叉执行

重排序结合线程交叉执行

共享变量更新后的值没有在工作内存与主存间及时更新

4.3.2 可见性之synchronized

JMM关于synchronized的规定

线程解锁前,必须把共享变量的最新值刷新到主内存

线程加锁时,将清空工作内存中共享变量的值,从而使

用共享变量时需要从主内存中重新读取最新的值(加锁与解锁是同一把锁)

4.3.3 可见性之volatile

通过加入内存屏障和禁止重排序优化来实现

对volatile变量写操作时,会在写操作后加入一条store

屏障指令,将本地内存中的共享变量值刷新到主内存

对volatile变量读操作时,会在读操作前加入一条load

屏障指令,从主内存中读取共享变量


volatile使用

</>复制代码

  1. volatile boolean inited = false;
  2. //线程1:
  3. context = loadContext();
  4. inited= true;
  5. // 线程2:
  6. while( !inited ){
  7. sleep();
  8. }
  9. doSomethingWithConfig(context)
4.4 有序性

一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

JMM允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性

4.4.1 happens-before 规则 5发布对象



5.1 安全发布对象




</>复制代码

  1. package com.mmall.concurrency.example.singleton;
  2. import com.mmall.concurrency.annoations.NotThreadSafe;
  3. /**
  4. * 懒汉模式 -》 双重同步锁单例模式
  5. * 单例实例在第一次使用时进行创建
  6. * @author shishusheng
  7. */
  8. @NotThreadSafe
  9. public class SingletonExample4 {
  10. /**
  11. * 私有构造函数
  12. */
  13. private SingletonExample4() {
  14. }
  15. // 1、memory = allocate() 分配对象的内存空间
  16. // 2、ctorInstance() 初始化对象
  17. // 3、instance = memory 设置instance指向刚分配的内存
  18. // JVM和cpu优化,发生了指令重排
  19. // 1、memory = allocate() 分配对象的内存空间
  20. // 3、instance = memory 设置instance指向刚分配的内存
  21. // 2、ctorInstance() 初始化对象
  22. /**
  23. * 单例对象
  24. */
  25. private static SingletonExample4 instance = null;
  26. /**
  27. * 静态的工厂方法
  28. *
  29. * @return
  30. */
  31. public static SingletonExample4 getInstance() {
  32. // 双重检测机制 // B
  33. if (instance == null) {
  34. // 同步锁
  35. synchronized (SingletonExample4.class) {
  36. if (instance == null) {
  37. // A - 3
  38. instance = new SingletonExample4();
  39. }
  40. }
  41. }
  42. return instance;
  43. }
  44. }


7 AQS 7.1 介绍

使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架

利用了一个int类型表示状态

使用方法是继承

子类通过继承并通过实现它的方法管理其状态{acquire 和release} 的方法操纵状态

可以同时实现排它锁和共享锁模式(独占、共享)

同步组件

CountDownLatch

</>复制代码

  1. package com.mmall.concurrency.example.aqs;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.concurrent.CountDownLatch;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. /**
  7. * @author shishusheng
  8. */
  9. @Slf4j
  10. public class CountDownLatchExample1 {
  11. private final static int threadCount = 200;
  12. public static void main(String[] args) throws Exception {
  13. ExecutorService exec = Executors.newCachedThreadPool();
  14. final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
  15. for (int i = 0; i < threadCount; i++) {
  16. final int threadNum = i;
  17. exec.execute(() -> {
  18. try {
  19. test(threadNum);
  20. } catch (Exception e) {
  21. log.error("exception", e);
  22. } finally {
  23. countDownLatch.countDown();
  24. }
  25. });
  26. }
  27. countDownLatch.await();
  28. log.info("finish");
  29. exec.shutdown();
  30. }
  31. private static void test(int threadNum) throws Exception {
  32. Thread.sleep(100);
  33. log.info("{}", threadNum);
  34. Thread.sleep(100);
  35. }
  36. }

</>复制代码

  1. package com.mmall.concurrency.example.aqs;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.concurrent.CountDownLatch;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.TimeUnit;
  7. /**
  8. * 指定时间内处理任务
  9. *
  10. * @author shishusheng
  11. *
  12. */
  13. @Slf4j
  14. public class CountDownLatchExample2 {
  15. private final static int threadCount = 200;
  16. public static void main(String[] args) throws Exception {
  17. ExecutorService exec = Executors.newCachedThreadPool();
  18. final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
  19. for (int i = 0; i < threadCount; i++) {
  20. final int threadNum = i;
  21. exec.execute(() -> {
  22. try {
  23. test(threadNum);
  24. } catch (Exception e) {
  25. log.error("exception", e);
  26. } finally {
  27. countDownLatch.countDown();
  28. }
  29. });
  30. }
  31. countDownLatch.await(10, TimeUnit.MILLISECONDS);
  32. log.info("finish");
  33. exec.shutdown();
  34. }
  35. private static void test(int threadNum) throws Exception {
  36. Thread.sleep(100);
  37. log.info("{}", threadNum);
  38. }
  39. }
Semaphore用法



CycliBarrier

</>复制代码

  1. package com.mmall.concurrency.example.aqs;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.concurrent.CyclicBarrier;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. /**
  7. * @author shishusheng
  8. */
  9. @Slf4j
  10. public class CyclicBarrierExample1 {
  11. private static CyclicBarrier barrier = new CyclicBarrier(5);
  12. public static void main(String[] args) throws Exception {
  13. ExecutorService executor = Executors.newCachedThreadPool();
  14. for (int i = 0; i < 10; i++) {
  15. final int threadNum = i;
  16. Thread.sleep(1000);
  17. executor.execute(() -> {
  18. try {
  19. race(threadNum);
  20. } catch (Exception e) {
  21. log.error("exception", e);
  22. }
  23. });
  24. }
  25. executor.shutdown();
  26. }
  27. private static void race(int threadNum) throws Exception {
  28. Thread.sleep(1000);
  29. log.info("{} is ready", threadNum);
  30. barrier.await();
  31. log.info("{} continue", threadNum);
  32. }
  33. }

</>复制代码

  1. package com.mmall.concurrency.example.aqs;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.concurrent.CyclicBarrier;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.TimeUnit;
  7. /**
  8. * @author shishusheng
  9. */
  10. @Slf4j
  11. public class CyclicBarrierExample2 {
  12. private static CyclicBarrier barrier = new CyclicBarrier(5);
  13. public static void main(String[] args) throws Exception {
  14. ExecutorService executor = Executors.newCachedThreadPool();
  15. for (int i = 0; i < 10; i++) {
  16. final int threadNum = i;
  17. Thread.sleep(1000);
  18. executor.execute(() -> {
  19. try {
  20. race(threadNum);
  21. } catch (Exception e) {
  22. log.error("exception", e);
  23. }
  24. });
  25. }
  26. executor.shutdown();
  27. }
  28. private static void race(int threadNum) throws Exception {
  29. Thread.sleep(1000);
  30. log.info("{} is ready", threadNum);
  31. try {
  32. barrier.await(2000, TimeUnit.MILLISECONDS);
  33. } catch (Exception e) {
  34. log.warn("BarrierException", e);
  35. }
  36. log.info("{} continue", threadNum);
  37. }
  38. }

</>复制代码

  1. package com.mmall.concurrency.example.aqs;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.Semaphore;
  6. /**
  7. * @author shishusheng
  8. */
  9. @Slf4j
  10. public class SemaphoreExample3 {
  11. private final static int threadCount = 20;
  12. public static void main(String[] args) throws Exception {
  13. ExecutorService exec = Executors.newCachedThreadPool();
  14. final Semaphore semaphore = new Semaphore(3);
  15. for (int i = 0; i < threadCount; i++) {
  16. final int threadNum = i;
  17. exec.execute(() -> {
  18. try {
  19. // 尝试获取一个许可
  20. if (semaphore.tryAcquire()) {
  21. test(threadNum);
  22. // 释放一个许可
  23. semaphore.release();
  24. }
  25. } catch (Exception e) {
  26. log.error("exception", e);
  27. }
  28. });
  29. }
  30. exec.shutdown();
  31. }
  32. private static void test(int threadNum) throws Exception {
  33. log.info("{}", threadNum);
  34. Thread.sleep(1000);
  35. }
  36. }
9 线程池 9.1 newCachedThreadPool

9.2 newFixedThreadPool

9.3 newSingleThreadExecutor

看出是顺序执行的

9.4 newScheduledThreadPool


10 死锁


文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72545.html

相关文章

  • Java并发Java并发编程与高并发基础概念

    摘要:笔记来源并发编程与高并发解决方案并发基础综述多级缓存缓存一致性乱序执行优化内存模型规定抽象结构同步八种操作及规则并发的优势与风险并发与高并发基本概念基本概念并发同时拥有两个或者多个线程,如果程序在单核处理器上运行,多个线程将交替地换入或者换 笔记来源:【IMOOC】Java并发编程与高并发解决方案 并发基础 综述: CPU多级缓存:缓存一致性、乱序执行优化 Java内存模型:JM...

    stackfing 评论0 收藏0
  • 做IT这几年,我整理了这些干货想要送给你!

    摘要:资源获取方式根据下面的索引,大家可以选择自己需要的资源,然后在松哥公众号牧码小子后台回复对应的口令,就可以获取到资源的百度云盘下载地址。公众号二维码如下另外本文会定期更新,松哥有新资源的时候会及时分享给大家,欢迎各位小伙伴保持关注。 没有一条路是容易的,特别是转行计算机这条路。 松哥接触过很多转行做开发的小伙伴,我了解到很多转行人的不容易,记得松哥大二时刚刚决定转行计算机,完全不知道这...

    王晗 评论0 收藏0
  • 从小白程序员一路晋升为大厂高级技术专家我看过哪些书籍?(建议收藏)

    摘要:大家好,我是冰河有句话叫做投资啥都不如投资自己的回报率高。马上就十一国庆假期了,给小伙伴们分享下,从小白程序员到大厂高级技术专家我看过哪些技术类书籍。 大家好,我是...

    sf_wangchong 评论0 收藏0
  • 并发编程 - 探索

    摘要:并发表示在一段时间内有多个动作存在。并发带来的问题在享受并发编程带来的高性能高吞吐量的同时,也会因为并发编程带来一些意想不到弊端。并发过程中多线程之间的切换调度,上下文的保存恢复等都会带来额外的线程切换开销。 0x01 什么是并发 要理解并发首选我们来区分下并发和并行的概念。 并发:表示在一段时间内有多个动作存在。 并行:表示在同一时间点有多个动作同时存在。 例如:此刻我正在写博客,但...

    pcChao 评论0 收藏0
  • Java并发】线程安全性

    摘要:另一个是使用锁的机制来处理线程之间的原子性。依赖于去实现锁,因此在这个关键字作用对象的作用范围内,都是同一时刻只能有一个线程对其进行操作的。 线程安全性 定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。 线程安全性主要体现在三个方面:原子性、可见性...

    刘玉平 评论0 收藏0

发表评论

0条评论

SimpleTriangle

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<