资讯专栏INFORMATION COLUMN

批量任务体现多线程的威力!

dreamans / 890人阅读

摘要:背景对于多线程的理解不是非常深刻,工作中用到多线程代码的机会也不多,前不久遇到了一个使用场景,通过编码实现后对于多线程的理解和应用有了更加深刻的理解。多线程发送短信中的一个核心要点是,将全部手机号码拆分成多个组后,分配给每个线程进行执行。

背景

对于多线程的理解不是非常深刻,工作中用到多线程代码的机会也不多,前不久遇到了一个使用场景,通过编码实现后对于多线程的理解和应用有了更加深刻的理解。场景如下:现有给用户发送产品调研的需求,运营的同事拿来了一个Excel文件,要求给Excel里面大约六万个手机号发送调研短信。

最简单的方法就是一个循环然后单线程顺序发送,但是核心问题在于,给短信运营商发短信的接口响应时间较长,假设平均100ms的响应时间,那么单线程发送的话需要6万*0.1秒=6000秒。显然这个时间是不能接受的,运营商系统的发送接口我们是不能优化的,只得增强自己的发送和处理能力才能尽快的完成任务。

批量发短信 读取Excel中的信息 包依赖

工具类代码,Maven中引入如下两个包

</>复制代码

  1. org.apache.poi
  2. poi-ooxml
  3. 3.17
  4. org.apache.xmlbeans
  5. xmlbeans
  6. 2.6.0
读取Excel的工具类代码

</>复制代码

  1. /**
  2. * 读取Excel的文件信息
  3. *
  4. * @param fileName
  5. */
  6. public static void readFromExcel(String fileName) {
  7. InputStream is = null;
  8. try {
  9. is = new FileInputStream(fileName);
  10. XSSFWorkbook workbook = new XSSFWorkbook(is);
  11. XSSFSheet sheet = workbook.getSheetAt(0);
  12. int num = 0;
  13. // 循环行Row
  14. for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) {
  15. XSSFRow row = sheet.getRow(rowNum);
  16. String phoneNumber = getStringValueFromCell(row.getCell(0)).trim();
  17. phoneList.add(phoneNumber);
  18. }
  19. System.out.println(num);
  20. } catch (FileNotFoundException e) {
  21. e.printStackTrace();
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. /**
  27. * 读取Excel里面Cell内容
  28. *
  29. * @param cell
  30. * @return
  31. */
  32. private static String getStringValueFromCell(XSSFCell cell) {
  33. // 单元格内的时间格式
  34. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
  35. // 单元格内的数字类型
  36. DecimalFormat decimalFormat = new DecimalFormat("#.#####");
  37. // 单元格默认为空
  38. String cellValue = "";
  39. if (cell == null) {
  40. return cellValue;
  41. }
  42. // 按类型读取
  43. if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) {
  44. cellValue = cell.getStringCellValue();
  45. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) {
  46. // 日期转为时间形式
  47. if (DateUtil.isCellDateFormatted(cell)) {
  48. double d = cell.getNumericCellValue();
  49. Date date = DateUtil.getJavaDate(d);
  50. cellValue = dateFormat.format(date);
  51. } else {
  52. // 其他转为数字
  53. cellValue = decimalFormat.format((cell.getNumericCellValue()));
  54. }
  55. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) {
  56. cellValue = "";
  57. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) {
  58. cellValue = String.valueOf(cell.getBooleanCellValue());
  59. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) {
  60. cellValue = "";
  61. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) {
  62. cellValue = cell.getCellFormula().toString();
  63. }
  64. return cellValue;
  65. }  
模拟运营商发送短信的方法

</>复制代码

  1. /**
  2. * 外部接口耗时长,通过多线程增强
  3. *
  4. * @param userPhone
  5. */
  6. public void sendMsgToPhone(String userPhone) {
  7. try {
  8. Thread.sleep(SEND_COST_TIME);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. System.out.println("send message to : " + userPhone);
  13. }
多线程发短信 简单的单线程发送

</>复制代码

  1. /**
  2. * 单线程发送
  3. *
  4. * @param phoneList
  5. * @return
  6. */
  7. private long singleThread(List phoneList) {
  8. long start = System.currentTimeMillis();
  9. /*// 直接主线程执行
  10. for (String phoneNumber : phoneList) {
  11. threadOperation.sendMsgToPhone(phoneNumber);
  12. }*/
  13. SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList);
  14. smet.start();
  15. long totalTime = System.currentTimeMillis() - start;
  16. System.out.println("单线程发送总时间:" + totalTime);
  17. return totalTime;
  18. }

对于大批量发短信的场景,如果使用单线程将全部一千个号码发送完毕的话,大约需要103132ms,可见效率低下,耗费时间较长。

多线程发送短信中的一个核心要点是,将全部手机号码拆分成多个组后,分配给每个线程进行执行。

两个线程的示例

</>复制代码

  1. /**
  2. * 两个线程发送
  3. *
  4. * @param phoneList
  5. * @return
  6. */
  7. private long twoThreads(List phoneList) {
  8. long start = System.currentTimeMillis();
  9. List list1 = phoneList.subList(0, phoneList.size() / 2);
  10. List list2 = phoneList.subList(phoneList.size() / 2, phoneList.size());
  11. SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1);
  12. smet.start();
  13. SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2);
  14. smet1.start();
  15. return 0;
  16. }
另一种数据分组方式

</>复制代码

  1. /**
  2. * 另外一种分配方式
  3. *
  4. * @param phoneList
  5. */
  6. private void otherThread(List phoneList) {
  7. for (int threadNo = 0; threadNo < 10; threadNo++) {
  8. int numbersPerThread = 10;
  9. List list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
  10. SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list);
  11. smet.start();
  12. if (list.size() < numbersPerThread) {
  13. break;
  14. }
  15. }
  16. }
线程池发送

</>复制代码

  1. /**
  2. * 线程池发送
  3. *
  4. * @param phoneList
  5. * @return
  6. */
  7. private void threadPool(List phoneList) {
  8. for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) {
  9. int numbersPerThread = 10;
  10. List list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
  11. threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list));
  12. }
  13. threadOperation.executorService.shutdown();
  14. }
使用Callable发送

</>复制代码

  1. /**
  2. * 多线程发送
  3. *
  4. * @param phoneList
  5. * @return
  6. */
  7. private void multiThreadSend(List phoneList) {
  8. List> futures = new ArrayList<>();
  9. for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) {
  10. int numbersPerThread = 100;
  11. List list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100);
  12. Future future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo)));
  13. futures.add(future);
  14. }
  15. for (Future future : futures) {
  16. try {
  17. System.out.println(future.get());
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. } catch (ExecutionException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. threadOperation.executorService.shutdown();
  25. }

使用多线程发送,将发送任务进行分割然后分配给每个线程执行,执行完毕需要10266ms,可见执行效率明显提升,消耗时间明显缩短。

完整代码

</>复制代码

  1. package com.lingyejun.tick.authenticator;
  2. import org.apache.poi.ss.usermodel.DateUtil;
  3. import org.apache.poi.xssf.usermodel.XSSFCell;
  4. import org.apache.poi.xssf.usermodel.XSSFRow;
  5. import org.apache.poi.xssf.usermodel.XSSFSheet;
  6. import org.apache.poi.xssf.usermodel.XSSFWorkbook;
  7. import java.io.FileInputStream;
  8. import java.io.FileNotFoundException;
  9. import java.io.IOException;
  10. import java.io.InputStream;
  11. import java.text.DecimalFormat;
  12. import java.text.SimpleDateFormat;
  13. import java.util.*;
  14. import java.util.concurrent.*;
  15. public class ThreadOperation {
  16. // 发短信的同步等待时间
  17. private static final long SEND_COST_TIME = 100L;
  18. // 手机号文件
  19. private static final String FILE_NAME = "/Users/lingye/Downloads/phone_number.xlsx";
  20. // 手机号列表
  21. private static List phoneList = new ArrayList<>();
  22. // 单例对象
  23. private static volatile ThreadOperation threadOperation;
  24. // 线程个数
  25. private static final int THREAD_POOL_SIZE = 10;
  26. // 初始化线程池
  27. private ExecutorService executorService = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE,
  28. 0L, TimeUnit.MILLISECONDS,
  29. new LinkedBlockingQueue());
  30. public ThreadOperation() {
  31. // 从本地文件中读取手机号码
  32. readFromExcel(FILE_NAME);
  33. }
  34. public static void main(String[] args) {
  35. ThreadOperation threadOperation = getInstance();
  36. //threadOperation.singleThread(phoneList);
  37. threadOperation.multiThreadSend(phoneList);
  38. }
  39. /**
  40. * 单例获取对象
  41. *
  42. * @return
  43. */
  44. public static ThreadOperation getInstance() {
  45. if (threadOperation == null) {
  46. synchronized (ThreadOperation.class) {
  47. if (threadOperation == null) {
  48. threadOperation = new ThreadOperation();
  49. }
  50. }
  51. }
  52. return threadOperation;
  53. }
  54. /**
  55. * 读取Excel的文件信息
  56. *
  57. * @param fileName
  58. */
  59. public static void readFromExcel(String fileName) {
  60. InputStream is = null;
  61. try {
  62. is = new FileInputStream(fileName);
  63. XSSFWorkbook workbook = new XSSFWorkbook(is);
  64. XSSFSheet sheet = workbook.getSheetAt(0);
  65. int num = 0;
  66. // 循环行Row
  67. for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) {
  68. XSSFRow row = sheet.getRow(rowNum);
  69. String phoneNumber = getStringValueFromCell(row.getCell(0)).trim();
  70. phoneList.add(phoneNumber);
  71. }
  72. System.out.println(num);
  73. } catch (FileNotFoundException e) {
  74. e.printStackTrace();
  75. } catch (IOException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. /**
  80. * 读取Excel里面Cell内容
  81. *
  82. * @param cell
  83. * @return
  84. */
  85. private static String getStringValueFromCell(XSSFCell cell) {
  86. // 单元格内的时间格式
  87. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
  88. // 单元格内的数字类型
  89. DecimalFormat decimalFormat = new DecimalFormat("#.#####");
  90. // 单元格默认为空
  91. String cellValue = "";
  92. if (cell == null) {
  93. return cellValue;
  94. }
  95. // 按类型读取
  96. if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) {
  97. cellValue = cell.getStringCellValue();
  98. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) {
  99. // 日期转为时间形式
  100. if (DateUtil.isCellDateFormatted(cell)) {
  101. double d = cell.getNumericCellValue();
  102. Date date = DateUtil.getJavaDate(d);
  103. cellValue = dateFormat.format(date);
  104. } else {
  105. // 其他转为数字
  106. cellValue = decimalFormat.format((cell.getNumericCellValue()));
  107. }
  108. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) {
  109. cellValue = "";
  110. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) {
  111. cellValue = String.valueOf(cell.getBooleanCellValue());
  112. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) {
  113. cellValue = "";
  114. } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) {
  115. cellValue = cell.getCellFormula().toString();
  116. }
  117. return cellValue;
  118. }
  119. /**
  120. * 外部接口耗时长,通过多线程增强
  121. *
  122. * @param userPhone
  123. */
  124. public void sendMsgToPhone(String userPhone) {
  125. try {
  126. Thread.sleep(SEND_COST_TIME);
  127. } catch (InterruptedException e) {
  128. e.printStackTrace();
  129. }
  130. System.out.println("send message to : " + userPhone);
  131. }
  132. /**
  133. * 单线程发送
  134. *
  135. * @param phoneList
  136. * @return
  137. */
  138. private long singleThread(List phoneList) {
  139. long start = System.currentTimeMillis();
  140. /*// 直接主线程执行
  141. for (String phoneNumber : phoneList) {
  142. threadOperation.sendMsgToPhone(phoneNumber);
  143. }*/
  144. SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList);
  145. smet.start();
  146. long totalTime = System.currentTimeMillis() - start;
  147. System.out.println("单线程发送总时间:" + totalTime);
  148. return totalTime;
  149. }
  150. /**
  151. * 另外一种分配方式
  152. *
  153. * @param phoneList
  154. */
  155. private void otherThread(List phoneList) {
  156. for (int threadNo = 0; threadNo < 10; threadNo++) {
  157. int numbersPerThread = 10;
  158. List list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
  159. SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list);
  160. smet.start();
  161. if (list.size() < numbersPerThread) {
  162. break;
  163. }
  164. }
  165. }
  166. /**
  167. * 两个线程发送
  168. *
  169. * @param phoneList
  170. * @return
  171. */
  172. private long twoThreads(List phoneList) {
  173. long start = System.currentTimeMillis();
  174. List list1 = phoneList.subList(0, phoneList.size() / 2);
  175. List list2 = phoneList.subList(phoneList.size() / 2, phoneList.size());
  176. SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1);
  177. smet.start();
  178. SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2);
  179. smet1.start();
  180. return 0;
  181. }
  182. /**
  183. * 线程池发送
  184. *
  185. * @param phoneList
  186. * @return
  187. */
  188. private void threadPool(List phoneList) {
  189. for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) {
  190. int numbersPerThread = 10;
  191. List list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
  192. threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list));
  193. }
  194. threadOperation.executorService.shutdown();
  195. }
  196. /**
  197. * 多线程发送
  198. *
  199. * @param phoneList
  200. * @return
  201. */
  202. private void multiThreadSend(List phoneList) {
  203. List> futures = new ArrayList<>();
  204. for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) {
  205. int numbersPerThread = 100;
  206. List list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100);
  207. Future future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo)));
  208. futures.add(future);
  209. }
  210. for (Future future : futures) {
  211. try {
  212. System.out.println(future.get());
  213. } catch (InterruptedException e) {
  214. e.printStackTrace();
  215. } catch (ExecutionException e) {
  216. e.printStackTrace();
  217. }
  218. }
  219. threadOperation.executorService.shutdown();
  220. }
  221. public class SendMsgExtendThread extends Thread {
  222. private List numberListByThread;
  223. public SendMsgExtendThread(List numberList) {
  224. numberListByThread = numberList;
  225. }
  226. @Override
  227. public void run() {
  228. long startTime = System.currentTimeMillis();
  229. for (int i = 0; i < numberListByThread.size(); i++) {
  230. System.out.print("no." + (i + 1));
  231. sendMsgToPhone(numberListByThread.get(i));
  232. }
  233. System.out.println("== single thread send " + numberListByThread.size() + "execute time:" + (System.currentTimeMillis() - startTime) + " ms");
  234. }
  235. }
  236. public class SendMsgImplCallable implements Callable {
  237. private List numberListByThread;
  238. private String threadName;
  239. public SendMsgImplCallable(List numberList, String threadName) {
  240. numberListByThread = numberList;
  241. this.threadName = threadName;
  242. }
  243. @Override
  244. public Long call() throws Exception {
  245. Long startMills = System.currentTimeMillis();
  246. for (String number : numberListByThread) {
  247. sendMsgToPhone(number);
  248. }
  249. Long endMills = System.currentTimeMillis();
  250. return endMills - startMills;
  251. }
  252. }
  253. }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77405.html

相关文章

  • (一)线程发展历史

    摘要:从多线程的发展来看,可以操作系统的发展分为三个历史阶段真空管和穿孔卡片晶体管和批处理系统集成电路和多道程序设计最早的计算机只能解决简单的数学运算问题,比如正弦余弦等。我们用了比较长的篇幅介绍了进程线程发展的历史。 专题简介 作为一个合格的Java程序员,必须要对并发编程有一个深层次的了解,在很多互联网企业都会重点考察这一块。可能很多工作3年以上的Java程序员对于这一领域几乎没有太多研...

    noONE 评论0 收藏0
  • 在浏览器中快速探测IP端口是否开放

    摘要:探测端口开放原理就是向目标发送请求,看是否有回应。端口判定为不通。扫描批量目标扫描批量目标使用的并发队列功能,去执行的执行单个任务,在扫描前做了一些额外的工作把浏览器屏蔽的端口过滤掉了,收到的状态就是。 0×00 前言 前两天 freebuf上的的XSS到内网的公开课很受启发,从一个页面到局域网,威力着实增强不少 公开课上检测内网 IP 实现方式用的是 img 标签,加载网站的 fav...

    jackwang 评论0 收藏0
  • 在浏览器中快速探测IP端口是否开放

    摘要:探测端口开放原理就是向目标发送请求,看是否有回应。端口判定为不通。扫描批量目标扫描批量目标使用的并发队列功能,去执行的执行单个任务,在扫描前做了一些额外的工作把浏览器屏蔽的端口过滤掉了,收到的状态就是。 0×00 前言 前两天 freebuf上的的XSS到内网的公开课很受启发,从一个页面到局域网,威力着实增强不少 公开课上检测内网 IP 实现方式用的是 img 标签,加载网站的 fav...

    jlanglang 评论0 收藏0
  • 消息队列二三事

    摘要:但是我们明显能感觉到这会降低吞吐量,因为消息不能并行投递了,而且会阻塞等待,也没法发挥的威力。 最近在看kafka的代码,就免不了想看看消息队列的一些要点:服务质量(QOS)、性能、扩展性等等,下面一一探索这些概念,并谈谈在特定的消息队列如kafka或者mosquito中是如何具体实现这些概念的。 服务质量 服务语义 服务质量一般可以分为三个级别,下面说明它们不同语义。 At most...

    dack 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<