资讯专栏INFORMATION COLUMN

python3 queue多线程通信

89542767 / 217人阅读


  小编写这篇文章的主要目的,主要是给大家介绍关于python3 queue多线程通信,这里面有很多的技术性的难点,那么,该怎么去进行处理呢,下面小编给大家进行详细的解答一下。


  queue分类


  python3 queue分三类:


  先进先出队列


  后进先出的栈


  优先级队列


  他们的导入方式分别是:


 from queue import Queue
  from queue import LifoQueue
  from queue import


  具体方法见下面引用说明。


  Queue对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。当使用队列时,协调生产者和消费者的关闭问题可能会有一些麻烦。一个通用的解决方法是在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行。


  例如:


  from queue import Queue
  from threading import Thread
  #用来表示终止的特殊对象
  _sentinel=object()
  #A thread that produces data
  def producer(out_q):
  for i in range(10):
  print("生产")
  out_q.put(i)
  out_q.put(_sentinel)
  #A thread that consumes data
  def consumer(in_q):
  while True:
  data=in_q.get()
  if data is _sentinel:
  in_q.put(_sentinel)
  break
  else:
  print("消费",data)
  #Create the shared queue and launch both threads
  q=Queue()
  t1=Thread(target=consumer,args=(q,))
  t2=Thread(target=producer,args=(q,))
  t1.start()
  t2.start()

  结果:

0.png

  本例里面有一个不寻常的位置:购买者在学到这些特殊值过后马上又将它放返回序列中,将它传下去。那样,任何窃听这一个序列的用户进程就能够关闭所有了。虽然序列是一种常见的线程间通信制度,但仍然能自己根据构建自已的程序设计并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用Condition变量来包装你的程序设计。下边这个例子演示了如何创建一个线程安全的优先级队列。


 import heapq
  import threading
  class PriorityQueue:
  def __init__(self):
  self._queue=[]
  self._count=0
  self._cv=threading.Condition()
  def put(self,item,priority):
  with self._cv:
  heapq.heappush(self._queue,(-priority,self._count,item))
  self._count+=1
  self._cv.notify()
  def get(self):
  with self._cv:
  while len(self._queue)==0:
  self._cv.wait()
  return heapq.heappop(self._queue)[-1]

  例子二、task_done和join


  使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性,比如下边这个例子中的task_done()和join():


  from queue import Queue
  from threading import Thread
  class Producer(Thread):
  def __init__(self,q):
  super().__init__()
  self.count=5
  self.q=q
  def run(self):
  while self.count>0:
  print("生产")
  if self.count==1:
  self.count-=1
  self.q.put(2)
  else:
  self.count-=1
  self.q.put(1)
  class Consumer(Thread):
  def __init__(self,q):
  super().__init__()
  self.q=q
  def run(self):
  while True:
  print("消费")
  data=self.q.get()
  if data==2:
  print("stop because data=",data)
  #任务完成,从队列中清除一个元素
  self.q.task_done()
  break
  else:
  print("data is good,data=",data)
  #任务完成,从队列中清除一个元素
  self.q.task_done()
  def main():
  q=Queue()
  p=Producer(q)
  c=Consumer(q)
  p.setDaemon(True)
  c.setDaemon(True)
  p.start()
  c.start()
  #等待队列清空
  q.join()
  print("queue is complete")
  if __name__=='__main__':
  main()


  结果:

1.png

  例子三、多线程里用queue


  设置俩队列,一个是要做的任务队列todo_queue,一个是已经完成的队列done_queue。


  每次执行线程,先从todo_queue队列里取出一个值,然后执行完,放入done_queue队列。


  如果todo_queue为空,就退出。


  import logging
  import logging.handlers
  import threading
  import queue
  log_mgr=None
  todo_queue=queue.Queue()
  done_queue=queue.Queue()
  class LogMgr:
  def __init__(self,logpath):
  self.LOG=logging.getLogger('log')
  loghd=logging.handlers.RotatingFileHandler(logpath,"a",0,1)
  fmt=logging.Formatter("%(asctime)s%(threadName)-10s%(message)s","%Y-%m-%d%H:%M:%S")
  loghd.setFormatter(fmt)
  self.LOG.addHandler(loghd)
  self.LOG.setLevel(logging.INFO)
  def info(self,msg):
  if self.LOG is not None:
  self.LOG.info(msg)
  class Worker(threading.Thread):
  global log_mgr
  def __init__(self,name):
  threading.Thread.__init__(self)
  self.name=name
  def run(self):
  while True:
  try:
  task=todo_queue.get(False)
  if task:
  log_mgr.info("HANDLE_TASK:%s"%task)
  done_queue.put(1)
  except queue.Empty:
  break
  return
  def main():
  global log_mgr
  log_mgr=LogMgr("mylog")
  for i in range(30):
  todo_queue.put("data"+str(i))
  workers=[]
  for i in range(3):
  w=Worker("worker"+str(i))
  workers.append(w)
  for i in range(3):
  workers<i>.start()
  for i in range(3):
  workers<i>.join()
  total_num=done_queue.qsize()
  log_mgr.info("TOTAL_HANDLE_TASK:%d"%total_num)
  exit(0)
  if __name__=='__main__':
  main()

  输出日志文件结果:

2.png

  到此为止,小编就给大家介绍到这里了,希望可以给各位读者带来帮助。


文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/127777.html

相关文章

  • Python3结合Sciter编写桌面程序第二节

    摘要:第二节将任务添加到队列上一个栗子只是简单实现了下网页与后台的通信你可以在这里处理任何你想要的操作你已经点到我了但由于是同一个进程,如果你做了很耗时的操作,比如下载一张图片之类的操作你会发现,窗口卡住了,一般表现为窗口泛白,出现未响应的提示但 第二节 将任务添加到队列! 上一个栗子只是简单实现了下网页与后台的通信 def clickMe(self): #你可以在这里处理任何你想要...

    Scholer 评论0 收藏0
  • python 线程编程

    摘要:,,等实用方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。可能你对一个队列使用判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。 python 多线程编程 使用回调方式 import time def countdown(n): while n > 0: print(T-minus, n) n -...

    dreamGong 评论0 收藏0
  • Python -- Queue模块

    摘要:默认值为,指定为时代表可以阻塞,若同时指定,在超时时返回。当消费者线程调用意味着有消费者取得任务并完成任务,未完成的任务数就会减少。当未完成的任务数降到,解除阻塞。 学习契机 最近的一个项目中在使用grpc时遇到一个问题,由于client端可多达200,每个端口每10s向grpc server发送一次请求,server端接受client的请求后根据request信息更新数据库,再将数据...

    rubyshen 评论0 收藏0
  • 7-并发编程

    摘要:进程线程切换都需要使用一定的时间。子进程在中,如果要运行系统命令,会使用来运行,官方建议使用方法来运行系统命令,更高级的用法是直接使用其接口。 多线程 简单示例 对于CPU计算密集型的任务,python的多线程跟单线程没什么区别,甚至有可能会更慢,但是对于IO密集型的任务,比如http请求这类任务,python的多线程还是有用处。在日常的使用中,经常会结合多线程和队列一起使用,比如,以...

    Cheriselalala 评论0 收藏0
  • Python中编写并发程序

    摘要:在中由于历史原因使得中多线程的效果非常不理想使得任何时刻只能利用一个核并且它的调度算法简单粗暴多线程中让每个线程运行一段时间然后强行挂起该线程继而去运行其他线程如此周而复始直到所有线程结束这使得无法有效利用计算机系统中的局部性频繁的线程切换 GIL 在Python中,由于历史原因(GIL),使得Python中多线程的效果非常不理想.GIL使得任何时刻Python只能利用一个CPU核,...

    _ipo 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<