python-使用`thread.join()`时多线程冻结

前端之家收集整理的这篇文章主要介绍了python-使用`thread.join()`时多线程冻结 前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

我正在尝试设置3个线程并在队列中执行5个任务.这个想法是线程将首先同时运行前三个任务,然后两个线程将完成其余两个任务.但是该程序似乎冻结了.我无法发现任何问题.

  1. from multiprocessing import Manager
  2. import threading
  3. import time
  4. global exitFlag
  5. exitFlag = 0
  6. class myThread(threading.Thread):
  7. def __init__(self,threadID,name,q):
  8. threading.Thread.__init__(self)
  9. self.threadID = threadID
  10. self.name = name
  11. self.q = q
  12. def run(self):
  13. print("Starting " + self.name)
  14. process_data(self.name,self.q)
  15. print("Exiting " + self.name)
  16. def process_data(threadName,q):
  17. global exitFlag
  18. while not exitFlag:
  19. if not workQueue.empty():
  20. data = q.get()
  21. print("%s processing %s" % (threadName,data))
  22. else:
  23. pass
  24. time.sleep(1)
  25. print('Nothing to Process')
  26. threadList = ["Thread-1","Thread-2","Thread-3"]
  27. nameList = ["One","Two","Three","Four","Five"]
  28. queueLock = threading.Lock()
  29. workQueue = Manager().Queue(10)
  30. threads = []
  31. threadID = 1
  32. # create thread
  33. for tName in threadList:
  34. thread = myThread(threadID,tName,workQueue)
  35. thread.start()
  36. threads.append(thread)
  37. threadID += 1
  38. # fill up queue
  39. queueLock.acquire()
  40. for word in nameList:
  41. workQueue.put(word)
  42. queueLock.release()
  43. # wait queue clear
  44. while not workQueue.empty():
  45. pass
  46. # notify thread exit
  47. exitFlag = 1
  48. # wait for all threads to finish
  49. for t in threads:
  50. t.join()
  51. print("Exiting Main Thread")

我不知道到底发生了什么,但是删除了join()部分之后,该程序就可以很好地运行了.我不明白的是,当队列清空时,exitFlag应该已经发出了信号.因此似乎以某种方式未通过process_data()检测到信号

最佳答案
您的代码有多个问题.首先,由于全局解释器锁(GIL),CPython中的线程不会“同时”运行Python代码.线程必须持有GIL才能执行Python字节码.默认情况下,如果线程没有阻止它,因为它确实阻塞了I / O,所以它在GIL中最多保留5毫秒(Python 3.2).为了并行执行Python代码,您将必须使用多重处理.

您也可以不必要地使用Manager.Queue而不是queue.Queue. Manager.Queue是在单独的管理器进程上的queue.Queue.您在此处引入了绕过IPC和内存复制的弯路,但没有任何好处.

造成僵局的原因是您在这里有竞争状况:

  1. if not workQueue.empty():
  2. data = q.get()

这不是原子操作.线程可以检查workQueue.empty(),然后删除GIL,让另一个线程耗尽队列,然后继续执行data = q.get(),如果您不将某些内容再次放入队列,它将永远阻塞. Queue.empty()检查是一种常规的反模式,不需要使用它.使用有毒药丸(前哨值)来代替获取循环并让工人知道他们应该退出.您需要与工作人员一样多的哨兵值.查找有关iter(callabel,sentinel)here的更多信息.

  1. import time
  2. from queue import Queue
  3. from datetime import datetime
  4. from threading import Thread,current_thread
  5. SENTINEL = 'SENTINEL'
  6. class myThread(Thread):
  7. def __init__(self,func,inqueue):
  8. super().__init__()
  9. self.func = func
  10. self._inqueue = inqueue
  11. def run(self):
  12. print(f"{datetime.now()} {current_thread().name} starting")
  13. self.func(self._inqueue)
  14. print(f"{datetime.now()} {current_thread().name} exiting")
  15. def process_data(_inqueue):
  16. for data in iter(_inqueue.get,SENTINEL):
  17. print(f"{datetime.now()} {current_thread().name} "
  18. f"processing {data}")
  19. time.sleep(1)
  20. if __name__ == '__main__':
  21. N_WORKERS = 3
  22. inqueue = Queue()
  23. input_data = ["One","Five"]
  24. sentinels = [SENTINEL] * N_WORKERS # one sentinel value per worker
  25. # enqueue input and sentinels
  26. for word in input_data + sentinels:
  27. inqueue.put(word)
  28. threads = [myThread(process_data,inqueue) for _ in range(N_WORKERS)]
  29. for t in threads:
  30. t.start()
  31. for t in threads:
  32. t.join()
  33. print(f"{datetime.now()} {current_thread().name} exiting")

示例输出

  1. 2019-02-14 17:58:18.265208 Thread-1 starting
  2. 2019-02-14 17:58:18.265277 Thread-1 processing One
  3. 2019-02-14 17:58:18.265472 Thread-2 starting
  4. 2019-02-14 17:58:18.265542 Thread-2 processing Two
  5. 2019-02-14 17:58:18.265691 Thread-3 starting
  6. 2019-02-14 17:58:18.265793 Thread-3 processing Three
  7. 2019-02-14 17:58:19.266417 Thread-1 processing Four
  8. 2019-02-14 17:58:19.266632 Thread-2 processing Five
  9. 2019-02-14 17:58:19.266767 Thread-3 exiting
  10. 2019-02-14 17:58:20.267588 Thread-1 exiting
  11. 2019-02-14 17:58:20.267861 Thread-2 exiting
  12. 2019-02-14 17:58:20.267994 MainThread exiting
  13. Process finished with exit code 0

如果您不坚持子类化Thread,也可以只使用multiprocessing.pool.ThreadPool也就是multiprocessing.dummy.Pool,它在后台为您做管道.

猜你在找的Python相关文章