Python threading.Thread只能用私有方法self来停止.__ Thread_stop()

前端之家收集整理的这篇文章主要介绍了Python threading.Thread只能用私有方法self来停止.__ Thread_stop()前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我有一个函数接受大量的x,y对作为输入,使用numpy和scipy做一些精细的曲线拟合,然后返回一个值.为了尝试加快速度,我尝试使用Queue.Queue将数据提供给两个线程.数据完成后.我试图让线程终止,然后结束调用进程并将控制权返回给 shell.

我试图理解为什么我必须在threading中使用私有方法.线程停止我的线程并将控制返回到命令行.

self.join()不会结束程序.获得控制权的唯一方法是使用私有停止方法.

def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            # self.join(timeout=None) does not work
            self._Thread__stop()

这是我的代码的近似值:

class CalcThread(threading.Thread):
        def __init__(self,in_queue,out_queue,function):
            threading.Thread.__init__(self)
            self.in_queue = in_queue
            self.out_queue = out_queue
            self.function = function
            self.finished = threading.Event()

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            self._Thread__stop()

        def run(self):
            while not self.finished.isSet():
                params_for_function = self.in_queue.get()
                try:
                    tm = self.function(paramsforfunction)
                    self.in_queue.task_done()
                    self.out_queue.put(tm)
                except ValueError as v:
                    #modify params and reinsert into queue
                    window = params_for_function["window"]
                    params_for_function["window"] = window + 1
                    self.in_queue.put(params_for_function)

    def big_calculation(well_id,window,data_arrays):
            # do some analysis to calculate tm
            return tm

    if __name__ == "__main__":
        NUM_THREADS = 2
        workers = []
        in_queue = Queue()
        out_queue = Queue()

        for i in range(NUM_THREADS):
            w = CalcThread(in_queue,big_calculation)
            w.start()
            workers.append(w)

        if options.analyze_all:
              for i in well_ids:
                  in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))

        in_queue.join()
        print "ALL THREADS SEEM TO BE DONE"
        # gather data and report it from out_queue
        for i in well_ids:
            p = out_queue.get()
            print p
            out_queue.task_done()
            # I had to do this to get the out_queue to proceed
            if out_queue.qsize() == 0:
                out_queue.join()
                break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method

        for aworker in workers:
            aworker.stop()

解决方法

通常,杀死修改共享资源的线程是个坏主意.

除非在执行计算时释放GIL,否则多线程中的cpu密集型任务比Python中的无用任务更糟糕.许多numpy函数确实发布了GIL.

ThreadPoolExecutor example from the docs

import concurrent.futures # on Python 2.x: pip install futures 

calc_args = []
if options.analyze_all:
    calc_args.extend(dict(well_id=i,...) for i in well_ids)

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    future_to_args = dict((executor.submit(big_calculation,args),args)
                           for args in calc_args)

    while future_to_args:
        for future in concurrent.futures.as_completed(dict(**future_to_args)):
            args = future_to_args.pop(future)
            if future.exception() is not None:
                print('%r generated an exception: %s' % (args,future.exception()))
                if isinstance(future.exception(),ValueError):
                    #modify params and resubmit
                    args["window"] += 1
                    future_to_args[executor.submit(big_calculation,args)] = args

            else:
                print('f%r returned %r' % (args,future.result()))

print("ALL work SEEMs TO BE DONE")

如果没有共享状态,您可以用ProcessPoolExecutor替换ThreadPoolExecutor.将代码放在main()函数中.

猜你在找的Python相关文章