资讯专栏INFORMATION COLUMN

「Python 面试」第三次更新

wslongchen / 1377人阅读

摘要:说一下进程线程以及多任务多进程多线程和协程进程概念一个程序对应一个进程,这个进程被叫做主进程,而一个主进程下面还有许多子进程。避免了由于系统在处理多进程或者多线程时,切换任务时需要的等待时间。

阅读本文大约需要 10 分钟。
14.说一下进程、线程、以及多任务(多进程、多线程和协程)

进程

概念
一个程序对应一个进程,这个进程被叫做主进程,而一个主进程下面还有许多子进程。

实现方式

fork()
示例:

import os
         
         
print("current_pid :%d" % os.getpid())
     
res = os.fork()
     
# 子进程返回的是 0
if res == 0:
print("res: %d" % res)
print("sub_pid: %d" % os.getpid())
     
# 主进程返回的是子进程的 pid
else:
    print("main_pid: %d" % os.getpid())
    print("res:%d" % res)
     
# 结果为
current_pid :12775
main_pid: 12775
res:12776
res: 0
sub_pid: 12776multiprocessing.Process

multiprocessing.Process
示例:

from multiprocessing import Process
import os, time
     
     
print("man_process pid : %d" % os.getpid())
     
class NewProcess(Process):
    def __init__(self):
        Process.__init__(self)
     
    def run(self):
        time.sleep(3)
        print("%d process was runing" % os.getpid())
     
np = NewProcess()
np.start()
     
# 结果为
man_process pid : 7846
7847 process was runing

multiprocessing.Pool

同步(apply)

示例:

from multiprocessing import Pool
import time, os, random
     
     
print("main_process pid: %d" % os.getpid())
     
def run():
    time.sleep(random.random())  # random.random() 随机生成一个小于 1 的浮点数
    print("%d process was runing" % os.getpid())
     
p = Pool(3)
     
for i in range(4):
    p.apply(run, args=())
     
p.close()
print("waiting for sub_process")
     
while True:
    # 获取 Pool 中剩余的进程数量
    count = len(p._cache)
    if count != 0:
        print("there was %d sub_process" % count)
        time.sleep(random.random())
    else:
        break
             
print("sub_process has done")
     
# 结果为
main_process pid: 4295
4297 process was runing
4296 process was runing
4298 process was runing
4297 process was runing
wating for sub_process
sub_process has done

异步(apply_async)
示例:

from multiprocessing import Pool
import time, os, random
          
          
print("main_process pid: %d" % os.getpid())
          
def run():
    # random.random() 随机生成一个小于 1 的浮点数
    time.sleep(random.random())  
    print("%d process was runing" % os.getpid())
   
p = Pool(3)
          
for i in range(4):
    p.apply_async(run, args=())
          
    p.close()
          
while True:
    # 获取 Pool 中剩余的进程数量
    count = len(p._cache)
    if count != 0:
        print("there was %d sub_process" % count)
        time.sleep(random.random())
    else:
        break
                  
print("wiating for sub_process..")
p.join()
          
print("sub_process has done")
          
# 结果为
main_process pid: 4342
wiating for sub_process..
there was 4 sub_process
4344 process was runing
there was 3 sub_process
4345 process was runing
4344 process was runing
4343 process was runing
sub_process has done

优缺点

fork()是计算机最底层的进程实现方式,一个fork()方法创建出来的进程有两个:主进程、子进程。fork()创建出来的进程,主进程不会等待子进程。

multiprocessing模块通过将fork方法封装成一个Process类,该类有一个start()方法,当调用该方法时,会自动调用run()方法,开启一个进程。并且由Process创建出来的进程,可以使用join()方法,使得主进程堵塞,被迫等待子进程。

multiprocess下另一种开启进程的方式是通过Pool进程池来实现。进程池可以开启多个进程来执行多个任务,但是进程数最大不会超过系统 CPU 核数。同样的,由Pool创建出来的进程,主进程也不会等待子进程,通过join()方法可以迫使主进程等待子进程,或者使用apply()同步的方式。

进程通信
进程之间的通信可以通过队列(Queue)来进行,多个进程一部分向队列里写入数据,一部分从队列里读取数据,从而完成多进程之间的通信问题。
示例:

from multiprocessing import Process, Queue
import random, time, os
  
  
def write(q):
    if not q.full():
        for i in range(4):
           q.put(i)
           print("%d was writing data[%d] to queue" % (os.getpid(), i))
              time.sleep(random.random())
    else:
        print("queue is full")
  
def read(q):
    # 等待队列被写入数据
    time.sleep(random.random())
    while True:
        if not q.empty():
            data = q.get()
            print("%d was reading data{%d} from queue" % (os.getpid(), data))
        else:
            print("queue is empty")
            break
      
# 创建通信队列,进程之间,全局变量不共享
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
      
pw.start()
pr.start()
      
pw.join()
pr.join()
print("end")
      
# 结果为
4640 was writing data[0] to queue
4640 was writing data[1] to queue
4640 was writing data[2] to queue
4641 was reading data{0} from queue
4641 was reading data{1} from queue
4641 was reading data{2} from queue
queue is empty
4640 was writing data[3] to queue
end

由于进程的执行顺序问题,造成了 pr 先于 pw 执行,所以 pr 未读取到数据,pr 进程任务结束,堵塞解开,主进程继续向下运行,最后 pw 任务结束。

进程通信改良
示例:

from multiprocessing import Process, Queue
import random, time, os
    
    
def write(q):
    if not q.full():
        for i in range(4):
            q.put(i)
            print("%d was writing data[%d] to queue" % (os.getpid(), i))
                  time.sleep(random.random())
    else:
        print("queue is full")
    
    def read(q):
        # 等待队列被写入数据
        time.sleep(random.random())
        while True:
            data = q.get()
            print("%d was reading data{%d} from queue" % (os.getpid(), data))
    
# 创建通信队列,进程之间,没有全局变量共享之说
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
    
pw.start()
pr.start()
    
pw.join()
# pr 进程立刻结束
pr.terminate()
print("end")
    
# 结果为
12898 was writing data[0] to queue
12898 was writing data[1] to queue
12898 was writing data[2] to queue
12899 was reading data{0} from queue
12899 was reading data{1} from queue
12899 was reading data{2} from queue
12898 was writing data[3] to queue
12899 was reading data{3} from queue
end

线程

概念
线程是进程下的一部分,进程下负责执行代码程序的就是线程,一个进程下会有很多个线程。同样的,一个主线程下面也有很多子线程。

另外,Python 中的线程依据的是 Java 中的线程模型,如果有兴趣的同学可以研究一下。

实现方式

示例:

import threading, time
  
  
def run():
    time.sleep(1)
    # currentThread() 返回的是当前的线程对象信息
    print("%s was runing" % threading.currentThread())
    print("current thread"name: %s" % threading.currentThread().getName())
  
# 创建一个线程
t = threading.Thread(target=run, args=())
  
# 启动线程
t.start()
  
# get_ident 返回的是当前线程对象所在的内存地址(id),该地址是唯一可以验证线程的数据
# 也可使用 currentThread().getName() 来简单的区分线程
print("current thread"name: %s" % threading.currentThread().getName())
print("main_thread tid: %s" % threading.get_ident())
  
# 结果为
current thread"name: MainThread
main_thread tid: 140427132020480
 was runing
current thread"name: Thread-1

线程通信

通信队列
通信队列作为相对来说最为安全的线程通信手段,其中Queue模块自身拥有所有所需的锁,这使得通信队列中的对象可以安全的在多线程之间共享。

这里用常见的「生产者-消费者模型」来介绍。

示例:

import threading, queue, time, random
    
flag = object()
    
def producter(q):
    for i in range(4):
        q.put(i)
    print("%s put data{%d} in queue" % (threading.currentThread().getName(), i))
    time.sleep(random.random())
    q.put(flag)
    
def consumer(q):
    time.sleep(random.random())
    while True:
        res = q.get()
        if res == flag:
            q.put(flag)
            break
        else:
            print("%s get data{%d} from queue" % (threading.currentThread().getName(), res))
    
# 创建队列
q = queue.Queue()
    
# 创建线程
pro = threading.Thread(target=producter, args=(q,))
con = threading.Thread(target=consumer, args=(q,))
    
pro.start()
con.start()
    
# 结果为
Thread-1 put data{0} in queue
Thread-1 put data{1} in queue
Thread-2 get data{0} from queue
Thread-2 get data{1} from queue
Thread-1 put data{2} in queue
Thread-2 get data{2} from queue
Thread-1 put data{3} in queue
Thread-2 get data{3} from queue
end

这里有一个细节。在多线程下,当生产者任务完成之后,向队列queue里添加了一个特殊对象(终止信号)flag,这样当消费者从queue中取出任务时,当取到flag时,意味着所有任务被取出,并再次将flag添加至queue中,这样其他线程中的消费者在接收到这个终止信号后,也会得知当前生产者任务已经全部发布。

轮询
通过为数据操作添加while循环判断,迫使线程被迫等待操作。(为了优化等待时间,应在最核心的位置添加判断条件)

示例:

import threading
        
        
class NewThread(threading.Thread):
    flag = 0
    g_num = 0
        
    def __init__(self):
         super().__init__()
        
    def run(self):
        print("%s was runing" % threading.currentThread().getName())
        if self.name == "Thread-1":
            self.add_num()
            NewThread.flag = 1
        else:
            # 轮询
            # Thread-2 被迫等待 Thread-1 完成任务之后才能执行
            while True:
                if NewThread.flag:
                    self.add_num()
                    break
        
    @classmethod
    def add_num(cls):
        global g_num
        for i in range(1000000):
            cls.g_num += 1
        print("on the %s, g_num: %d" % (threading.currentThread().getName(), cls.g_num))
        
t1 = NewThread()
t2 = NewThread()
        
t1.start()
t2.start()
        
# 结果为
Thread-1 was runing
Thread-2 was runing
on the Thread-1, g_num: 1000000
on the Thread-2, g_num: 2000000

互斥锁
互斥锁是专门为了针对线程安全而设计的一种结构,锁可以强制线程排序,保护线程安全,但是加锁、解锁会消耗系统 CPU 资源。

互斥锁优化

示例:

import threading
      
      
class NewThread(threading.Thread):
    g_num = 0
    # 生成锁对象
    lock = threading.Lock()
      
    def __init__(self):
         super().__init__()
      
         def run(self):
               # 判断当前线程是否上锁,若未上锁,则一直尝试上锁(acquire)直至成功
             with NewThread.lock:
                 print("%s was runing" % self.name)
                 self.add_num()
      
         @classmethod
         def add_num(cls):
             for i in range(1000000):
                 cls.g_num += 1
             print("on the %s g_num: %d" % (threading.currentThread().getName(), cls.g_num))
      
t1 = NewThread()
t2 = NewThread()
      
t1.start()
t2.start()
      
# 结果为
Thread-1 was runing
on the Thread-1 g_num: 1000000
Thread-2 was runing
on the Thread-2 g_num: 2000000

死锁问题
当多线程下出现多个锁,判断条件又是另一个线程里的锁时,就会出现一种情况:当另一个线程任务执行时间过长,或是线程结束,未解锁。当前线程由于迟迟无法上锁,程序始终阻塞,此时就会陷入死锁问题。

死锁问题解决

设置超时时间threading.Lock().acquire(timeout=3)只要在上锁时设置超时时间timeout=,只要超过时间,线程就会不再等待是否解锁,而是直接运行。但是这种方式很危险,可能会带来大量的等待时间。

为每个锁添加一个特殊编号,多线程在获取锁的时候严格按照该编号的升序方式来获取,相当于为线程排序,这样就避免了多线程因为资源争抢,而陷入死锁的可能。

银行家算法

进程与线程的区别

线程和进程的执行顺序都是一样的,都是由操作系统的调度算法决定,不是根据程序的编写顺序来决定。

进程是资源分配的单位,而线程是 CPU 调度的单位。

进程在主程序结束后,程序立马结束,需要手动利用join()方法使得主程序发生堵塞,来等待子进程。而主线程的任务结束后,程序会等待子线程结束才会结束。故不需要特意使用join()方法来使主线程等待子线程。

多进程适合 CPU 密集型,多线程适合 I/O 密集型。

协程

概念
线程下的一种,也叫微线程,单线程自身控制切换任务时机,达到多任务的效果。避免了由于系统在处理多进程或者多线程时,切换任务时需要的等待时间。这一点很像操作系统里的中断。

实现方式

生成器(yield)
生成器相关内容可看问题 13。

这里以一个简单的「生产者-消费者模型」来解释如何使用生成器实现协程。

示例:

import threading
     
     
def producter(c):
    next(c)
    n = 4
    print("%s was running" % threading.currentThread().getName())
    
    while n:
        print("product data: %d" % n)
        res = c.send(n)
        print(res)
        n -= 1
    print("sale out")
     
     
def consumer():
    res = ""
     
    print("%s was running" % threading.currentThread().getName())
    while True:
        n = yield res
     
        print("consume data: %d" % n)
        res = "200 OK"
     
print("%s was running" % threading.currentThread().getName())
c = consumer()
     
producter(c)
     
# 结果为
MainThread was running
MainThread was running
MainThread was running
product data: 4
consume data: 4
200 OK
product data: 3
consume data: 3
200 OK
product data: 2
consume data: 2
200 OK
product data: 1
consume data: 1
200 OK
sale out

可以看到,生产者事先不知道消费者具体要消费多少数据,生产者只是一直在生产。而消费者则是利用生成器的中断特性,consumer函数中,程序每一次循环遇到yield关键字就会停下,等待producter函数启动生成器,再继续下一次循环。

在这中间只有一个线程在运行,任务的切换时机由程序员自己控制,避免了由于多线程之间的切换消耗,这样就简单实现了协程。

异步 I/O(asyncio)
由于生成器在未来的 Python 3.10 版本中将不在支持协程,而是推荐使用asyncio库,该库适用于高并发。

自己目前不会,就不瞎 BB 了,具体可看文档。

asyncio 中文文档

未写完,下次更新补上

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/45144.html

相关文章

  • http相关面试

    摘要:状态码有那些分别代表是什么意思简单版继续,一般在发送请求时,已发送了之后服务端将返回此信息,表示确认,之后发送具体参数信息正常返回信息请求成功并且服务器创建了新的资源服务器已接受请求,但尚未处理请求的网页已永久移动到新位置。 http状态码有那些?分别代表是什么意思? 简单版 [ 100 Continue 继续,一般在发送post请求时,已发送了http header之后...

    沈建明 评论0 收藏0
  • 2018前端面试题汇总(更新...)

    摘要:方法一因为是从开始的方法二获取怎么实现和截取考察的用法。翻转字符串和删除数组的第一元素将字符串转化为数组。将数组进行翻转。将数组转换为字符串。被删除的第一个元素删除后的数组数组去重如果找到不到就把放到新数组里 1.运算题的结果 var name=jay var pe={ name:kang, getname:function () { ...

    smartlion 评论0 收藏0
  • 关于三次握手与四次挥手面试官想考我们什么?--- 不看后悔系列

    摘要:第三次握手客户端收到报文之后,会回应一个报文。因此,需要三次握手才能确认双方的接收与发送能力是否正常。三次握手的作用三次握手的作用也是有好多的,多记住几个,保证不亏。也就是说,第一次第二次握手不可以携带数据,而第三次握手是可以携带数据的。在面试中,三次握手和四次挥手可以说是问的最频繁的一个知识点了,我相信大家也都看过很多关于三次握手与四次挥手的文章,今天的这篇文章,重点是围绕着面试,我们应该...

    WilsonLiu95 评论0 收藏0
  • 2018年, 我的前端面试复盘

    摘要:技术一面一面主要考察基础,有些会有技术笔试,比如腾讯,。腾讯的面试官就很喜欢问,安全,浏览器缓存方面的问题,计算机基础,但是要懂为什么。 这篇文章简单总结下2018年内我的一些前端面试经历, 在这简单分享一下,希望对大家有所启发。 楼主在深圳,毕业两年。面的主要是深圳的几家公司。 包括: 腾讯, 蚂蚁金服, Lazada, Shopee, 有赞 等 。 楼主在准备面试前, 想着复习一...

    Yujiaao 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<