资讯专栏INFORMATION COLUMN

7-并发编程

Cheriselalala / 3460人阅读

摘要:进程线程切换都需要使用一定的时间。子进程在中,如果要运行系统命令,会使用来运行,官方建议使用方法来运行系统命令,更高级的用法是直接使用其接口。

多线程 简单示例

对于CPU计算密集型的任务,python的多线程跟单线程没什么区别,甚至有可能会更慢,但是对于IO密集型的任务,比如http请求这类任务,python的多线程还是有用处。在日常的使用中,经常会结合多线程和队列一起使用,比如,以爬取simpledestops 网站壁纸为例:

</>复制代码

  1. import os
  2. from datetime import datetime
  3. from queue import Queue
  4. from threading import Thread
  5. import requests
  6. requests.packages.urllib3.disable_warnings()
  7. from bs4 import BeautifulSoup
  8. import re
  9. if not os.path.exists("img"):
  10. os.mkdir("img")
  11. # 声明一个队列
  12. Q = Queue()
  13. def producer(pages):
  14. for page in range(1,pages+1):
  15. # 提取每一页的图片 url 加入队列
  16. print("[-] 收集第 {} 页".format(str(page)))
  17. url = "http://simpledesktops.com/browse/"+str(page)+"/"
  18. r = requests.get(url,verify=False)
  19. html = r.text
  20. soup = BeautifulSoup(html,"html.parser")
  21. try:
  22. imgs = soup.find_all("img")
  23. for img in imgs:
  24. img_url = img["src"]
  25. Q.put(img_url)
  26. except:
  27. pass
  28. def worker(i):
  29. # 取出队列的值,按顺序取,下载图片
  30. while not Q.empty():
  31. img_url = Q.get()
  32. text = re.search("(http://static.simpledesktops.com/uploads/desktops/d+/d+/d+/(.*?png)).*?png",img_url)
  33. new_img_url = text.group(1)
  34. r = requests.get(new_img_url,verify=False)
  35. path = "img/"+text.group(2)
  36. print("[-] 线程 {} 开始下载 {} 开始时间:{}".format(i,text.group(2),datetime.now()))
  37. with open(path,"wb") as f:
  38. f.write(r.content)
  39. Q.all_tasks_done
  40. if __name__ =="__main__":
  41. # 一定要将数据加入队列,否则是启动不了的,因为队列为空
  42. producer(50)
  43. # 线程的声明
  44. ts = [Thread(target=worker,args=(i,)) for i in range(50)]
  45. for t in ts:
  46. t.start()
  47. for t in ts:
  48. t.join()

我们使用start启动多线程,使用 join 防止主线程退出的时候结束所有的线程,使用队列有序的且并发的下载壁纸。 仔细观察就会发现代码其实有迹可循,更改其中的爬取内容的部分代码后,我们就可以应用于爬取别的网站。

ThreadLocal

按照道理来说,多线程中,每个线程的处理逻辑应该是相同的,但是其处理的数据,却不一定是相同的,如果数据是全局的,那么我们就需要加锁,防止数据混乱,这样一来就会麻烦很多,所以线程处理的数据最好是局部的、其他线程不能干扰的。

代码示例:

</>复制代码

  1. # coding: utf-8
  2. import threading,time
  3. import requests
  4. requests.packages.urllib3.disable_warnings()
  5. from datetime import datetime
  6. local_variable = threading.local()
  7. # 逻辑处理函数
  8. def worker():
  9. print("每个线程启动的时间: ",datetime.now())
  10. time.sleep(10)
  11. url = local_variable.url
  12. r = requests.get(url,verify=False)
  13. print(r.url,datetime.strftime(datetime.now(),"%H:%M:%S"),threading.current_thread().name)
  14. # 线程处理函数
  15. def process_thread(url):
  16. local_variable.url = url
  17. worker()
  18. if __name__ == "__main__":
  19. ts = [threading.Thread(target=process_thread,args=(url,))for url in ["https://www.baidu.com","https://www.google.com","https://www.bing.com"]]
  20. for t in ts:
  21. t.start()
  22. for t in ts:
  23. t.join()

输出:

</>复制代码

  1. 线程Thread-1 启动的时间:2019-01-09 11:25:18.339631
  2. 线程Thread-2 启动的时间:2019-01-09 11:25:18.340646
  3. 线程Thread-3 启动的时间:2019-01-09 11:25:18.342635
  4. https://www.baidu.com/ 11:25:28 Thread-1
  5. https://cn.bing.com/ 11:25:29 Thread-3
  6. https://www.google.com/ 11:25:29 Thread-2
多进程 进程池

python中使用 multiprocessing 来创建多进程,如果要创建多个子进程,则需要使用 进程池 Pool 来创建,一个简单的例子:

</>复制代码

  1. from multiprocessing import Pool
  2. import os
  3. from datetime import datetime
  4. """
  5. @param {type} int
  6. @return: None
  7. """
  8. def print_num(i):
  9. print("进程{} 打印 {}".format(os.getpid(),i))
  10. if __name__ == "__main__":
  11. p = Pool(4)
  12. for i in range(100):
  13. p.apply_async(print_num,args=(i,))
  14. # 关闭进程池,不再加入进程
  15. p.close()
  16. # 防止主进程结束,子进程无法继续运行
  17. p.join()

输出:

</>复制代码

  1. 进程2624 打印 0
  2. 进程2625 打印 1
  3. 进程2626 打印 3
  4. 进程2627 打印 2
  5. 进程2624 打印 4
  6. 进程2625 打印 5
  7. 进程2626 打印 6
  8. 进程2627 打印 7
  9. 进程2624 打印 8
  10. ...

进程可以实现并行运行代码,但是一旦进程太多,CPU运行不过来也是需要进行等待,用了多进程以后,就可以不使用队列了,也可以实现多线程的效果

除此之外,还可以多进程和多线程结合起来使用,一个简单的例子

</>复制代码

  1. from multiprocessing import Pool
  2. import threading
  3. import os,time
  4. import queue
  5. from datetime import datetime
  6. def producer(i):
  7. Q = queue.Queue()
  8. start = 25*(i-1)
  9. end = 100 * int(i / 4)
  10. for x in range(start,end):
  11. Q.put(x)
  12. return Q
  13. def process_thread(Q,j):
  14. while not Q.empty():
  15. item = Q.get()
  16. print("进程{}: 线程{} 正在消耗:{} 时间:{}".format(os.getpid(),j,item,datetime.now()))
  17. Q.all_tasks_done
  18. def tasks(i):
  19. Q = producer(i)
  20. ts = [threading.Thread(target=process_thread,args=(Q,j)) for j in range(10)]
  21. for t in ts:
  22. t.start()
  23. for t in ts:
  24. t.join()
  25. if __name__ == "__main__":
  26. start = datetime.now()
  27. p = Pool(4)
  28. for i in range(1,5):
  29. print(i)
  30. p.apply_async(tasks,args=(i,))
  31. p.close()
  32. p.join()
  33. end = datetime.now()
  34. waste = end-start
  35. print("一共花费了: {}".format(waste))

先将要处理的数据,填进队列,然后创建4个进程,10个线程运行。 其输出为:

</>复制代码

  1. """
  2. (venv) C:projectlibraries-python>python bulit-in-libraries
  3. hreadingmultithreading.py
  4. 进程17020: 线程0 正在消耗:1 时间:2019-01-09 12:50:48.701523
  5. 进程17020: 线程1 正在消耗:2 时间:2019-01-09 12:50:48.703521
  6. 进程17020: 线程3 正在消耗:4 时间:2019-01-09 12:50:48.704365
  7. 进程17020: 线程2 正在消耗:3 时间:2019-01-09 12:50:48.704365
  8. 进程2804: 线程0 正在消耗:5 时间:2019-01-09 12:50:48.706349
  9. 进程2804: 线程1 正在消耗:6 时间:2019-01-09 12:50:48.707352
  10. 进程2804: 线程4 正在消耗:9 时间:2019-01-09 12:50:48.708355
  11. 进程2804: 线程3 正在消耗:8 时间:2019-01-09 12:50:48.708355
  12. 进程2804: 线程2 正在消耗:7 时间:2019-01-09 12:50:48.708355
  13. 进程16060: 线程0 正在消耗:10 时间:2019-01-09 12:50:48.728409
  14. 进程16060: 线程1 正在消耗:11 时间:2019-01-09 12:50:48.730413
  15. 进程16060: 线程4 正在消耗:14 时间:2019-01-09 12:50:48.732418
  16. 进程16060: 线程3 正在消耗:13 时间:2019-01-09 12:50:48.732418
  17. 进程16060: 线程2 正在消耗:12 时间:2019-01-09 12:50:48.732418
  18. 进程7588: 线程3 正在消耗:18 时间:2019-01-09 12:50:48.761808
  19. 进程7588: 线程4 正在消耗:19 时间:2019-01-09 12:50:48.761808
  20. 进程7588: 线程0 正在消耗:15 时间:2019-01-09 12:50:48.761808
  21. 进程7588: 线程1 正在消耗:16 时间:2019-01-09 12:50:48.761808
  22. 进程7588: 线程2 正在消耗:17 时间:2019-01-09 12:50:48.761808

后来实验了打印出10万个数,4个进程,每个进程400个线程,花费了39秒。而400个线程,只花费了17秒。所以有时候,也并不是多就是好。进程线程切换都需要使用一定的时间。

子进程

在python中,如果要运行系统命令,会使用 subprocess 来运行,官方建议使用run 方法来运行系统命令,更高级的用法是直接使用其 Popen 接口。
其函数格式为:

</>复制代码

  1. subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None)

可以看几个简单的例子:

直接使用

</>复制代码

  1. import subprocess
  2. subprocess.run(["ls","-al"])

在python3.7 之前,默认系统命令执行的结果(输出/错误)不存在stdout/stderr 里面,需要设置 capture_output=True,而在python3.6 版本,如果你需要使用执行的结果,你就需要设置 stdout. 如下所示

</>复制代码

  1. # python 3.6
  2. >>> a = subprocess.run(["ls","-al"],stdout=subprocess.PIPE)
  3. >>> a.stdout
  4. # python3.7
  5. >>> a = subprocess.run(["ls","-al"],capture_output=True)
  6. >>> a.stdout

所以可以看出python3.7 又做了一层封装,为了让大家使用更上一层的接口。可以看一下几个参数的含义为:

</>复制代码

  1. args 列表,为shell命令
  2. shell boolean值, 设置后,args可以直接接受shell命令
  3. capture_output = True , 设置后,stdout/stderr会存储值
  4. check=True, 设置后,如果程序异常退出,会跑出一个CalledProcessError异常
  5. cwd 是工作目录,可以为str,或者path-like 类
高级使用

Popen的构造函数:

</>复制代码

  1. class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=(), *, encoding=None, errors=None)

一个简单的例子

</>复制代码

  1. p = subprocess.Popen(["ls","-al"],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

其次,通过Popen.communicate() ,子进程可以在启动了以后,还可以进行参数的输入

</>复制代码

  1. import subprocess
  2. print("$ nslookup")
  3. p = subprocess.Popen(["nslookup"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  4. output, err = p.communicate(b"set q=mx
  5. python.org
  6. exit
  7. ")
  8. print(output.decode("utf-8"))
  9. print("Exit code:", p.returncode)
  10. 其输出:
  11. $ nslookup
  12. Server: 192.168.19.4
  13. Address: 192.168.19.4#53
  14. Non-authoritative answer:
  15. python.org mail exchanger = 50 mail.python.org.
  16. Authoritative answers can be found from:
  17. mail.python.org internet address = 82.94.164.166
  18. mail.python.org has AAAA address 2001:888:2000:d::a6
  19. Exit code: 0
分布式多进程

python的分布式接口简单,使用起来也十分简单,可以参考廖雪峰的教程,需要的时候,修改代码,即可完成属于自己的分布式程序

这里贴出代码:

</>复制代码

  1. # master
  2. import random,time,queue
  3. from multiprocessing.managers import BaseManager
  4. task_queue = queue.Queue()
  5. result_queue = queue.Queue()
  6. class QueueManager(BaseManager):
  7. pass
  8. QueueManager.register("get_task_queue",callable=lambda:task_queue)
  9. QueueManager.register("get_result_queue",callable=lambda:result_queue)
  10. manager = QueueManager(address=("",5000),authkey=b"abc")
  11. manager.start()
  12. tasks = manager.get_task_queue()
  13. results = manager.get_result_queue()
  14. for i in range(10):
  15. n = random.randint(0,10000)
  16. print("put task {}".format(n))
  17. tasks.put(n)
  18. print("try get results...")
  19. for i in range(10):
  20. r = results.get(timeout=100)
  21. print("result:{}".format(r))
  22. manager.shutdown()
  23. print("master exit")
  24. # worker
  25. import time,sys,queue
  26. from multiprocessing.managers import BaseManager
  27. class QueueManager(BaseManager):
  28. pass
  29. QueueManager.register("get_task_queue")
  30. QueueManager.register("get_result_queue")
  31. # master的主机地址
  32. server_addr = "127.0.0.1"
  33. print("connect to server...")
  34. m = QueueManager(address=(server_addr,5000),authkey=b"abc")
  35. m.connect()
  36. tasks = m.get_task_queue()
  37. results = m.get_result_queue()
  38. for i in range(10):
  39. try:
  40. n = tasks.get(timeout=1)
  41. print("run task %d * %d..." % (n, n))
  42. r = "{} * {} = {}".format(n,n,n*n)
  43. time.sleep(1)
  44. results.put(r)
  45. except Queue.Empty:
  46. print("task queue is empty.")
  47. print("worker exit.")
参考

https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000
https://docs.python.org/3.6/library/subprocess.html
https://docs.python.org/3.7/library/subprocess.html

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/42965.html

相关文章

  • java并发编程学习之ConcurrentHashMap(JDK1.7)

    摘要:之前中提过,并发的时候,可能造成死循环,那么在多线程中可以用来避免这一情况。默认,当容量大于时,开始扩容并发数,默认,直接影响和的值,以及的初始化数量。初始化的数量,为最接近且大于的办等于的次方的值,比如,数量为,,数量为。 之前HashMap中提过,并发的时候,可能造成死循环,那么在多线程中可以用ConcurrentHashMap来避免这一情况。 Segment Concurrent...

    piglei 评论0 收藏0
  • java并发编程学习7--同步--synchronized关键字

    摘要:如果两个线程存取相同的对象,并且每一个线程都调用一个修改该对象状态的方法,根据线程访问数据的顺序,可能会出现错误的数据结果,这种现象成为条件竞争。而问题往往就是有多个线程同时在执行步骤。内部锁有如下的特点不能中断正在试图获得锁的线程。 【条件竞争 在多线程的开发中,两个及其以上的线程需要共享统一数据的存取。如果两个线程存取相同的对象,并且每一个线程都调用一个修改该对象状态的方法,根据线...

    zzzmh 评论0 收藏0
  • <java并发编程实战>学习一

    摘要:无状态的是线程安全的,当无状态变为有状态时就是不安全的破坏了线程的安全性,非原子性操作竞态条件在并发编程中,由于不恰当的执行时序而出现的不正确结果是一种非常重要的情况,被称之为竞态条件。重入意味着获取锁的操作的粒度是线程,而不是调用。 这本书的内容是什么? 本书提供了各种实用的设计规则,用于帮助开发人员创建安全的和高性能的并发类。 什么类是线程安全的? 当多个线程访问某...

    xiaoqibTn 评论0 收藏0
  • 从小白程序员一路晋升为大厂高级技术专家我看过哪些书籍?(建议收藏)

    摘要:大家好,我是冰河有句话叫做投资啥都不如投资自己的回报率高。马上就十一国庆假期了,给小伙伴们分享下,从小白程序员到大厂高级技术专家我看过哪些技术类书籍。 大家好,我是...

    sf_wangchong 评论0 收藏0
  • [Java并发-1]入门:并发编程Bug的源头

    摘要:所以这情况下,当线程操作变量的时候,变量并不对线程可见。总结,缓存引发的可见性问题,切换线程带来的原子性问题,编译带来的有序性问题深刻理解这些前因后果,可以诊断大部分并发的问题 背景介绍 如何解决并发问题,首先要理解并发问题的实际源头怎么发生的。 现代计算机的不同硬件的运行速度是差异很大的,这个大家应该都是知道的。 计算机数据传输运行速度上的快慢比较: CPU > 缓存 > I/O ...

    xiguadada 评论0 收藏0
  • 对python并发编程的思考

    摘要:我们以请求网络服务为例,来实际测试一下加入多线程之后的效果。所以,执行密集型操作时,多线程是有用的,对于密集型操作,则每次只能使用一个线程。说到这里,对于密集型,可以使用多线程或者多进程来提高效率。 为了提高系统密集型运算的效率,我们常常会使用到多个进程或者是多个线程,python中的Threading包实现了线程,multiprocessing 包则实现了多进程。而在3.2版本的py...

    sshe 评论0 收藏0

发表评论

0条评论

Cheriselalala

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<