理解python的multiprocessing.pool threadpool多线程

来源:互联网 时间:1970-01-01

起因,我用多线程去用访问elasticsearch api时候,想拿到es返回的search结果。 默认python threading是不能获取返回的结果的。

有这么几种方式可以取到多线程执行后的结果 .

1. 使用Queue队列的方式.

2. 使用共享变量的问题

文章的笔误太多,想修改的时候,会发现文章不知不觉被转载到各个论坛了。 我这里标注下原文地址。 http://xiaorui.cc/2015/11/03/%E7%90%86%E8%A7%A3python%E7%9A%84multiprocessing-pool-threadpool%E5%A4%9A%E7%BA%BF%E7%A8%8B/

我们知道multiprocessing Process是可以返回每个调用的结果,在multiprocessing下也有个多线程模块,通过async_result.get()可以获取结果。好奇ThreadPool是如何实现的?或者说multiprocessing是如何实现的多线程.

python有两个多线程的入口,一个是 dummy Pool 另一个是pool.ThreadPool

Python

from multiprocessing.pool import ThreadPooldef foo(bar, baz): print 'hello {0}'.format(bar) return 'foo' + bazpool = ThreadPool(processes=1)async_result = pool.apply_async(foo, ('xiaorui.cc', 'foo',)) frommultiprocessing.poolimportThreadPool deffoo(bar,baz): print'hello {0}'.format(bar) return'foo'+baz pool=ThreadPool(processes=1) async_result=pool.apply_async(foo,('xiaorui.cc','foo',))

#使用async_result.get可以取出结果.

return_val = async_result.get()

下面我们来看看

file:2.7/lib/python2.7/multiprocessing/pool.py

Python

# blog: http://xiaorui.ccclass ThreadPool(Pool): from .dummy import Process def __init__(self, processes=None, initializer=None, initargs=()): Pool.__init__(self, processes, initializer, initargs)def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin ''' assert self._state == RUN result = ApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result#取结果class ApplyResult(object): def __init__(self, cache, callback): self._cond = threading.Condition(threading.Lock()) self._job = job_counter.next() self._cache = cache self._ready = False self._callback = callback cache[self._job] = self def get(self, timeout=None): self.wait(timeout) if not self._ready: raise TimeoutError if self._success: return self._value else: raise self._value#实例化Pool类的时候,就会由一个线程start下面的函数,他的主要任务是写入结果class Pool():self._result_handler = threading.Thread( target=Pool._handle_results, args=(self._outqueue, self._quick_get, self._cache) )def _handle_results(outqueue, get, cache): thread = threading.current_thread() while 1: try: task = get() except (IOError, EOFError): debug('result handler got EOFError/IOError -- exiting') return if thread._state: assert thread._state == TERMINATE debug('result handler found thread._state=TERMINATE') break if task is None: debug('result handler got sentinel') break job, i, obj = task try: cache[job]._set(i, obj) except KeyError: pass while cache and thread._state != TERMINATE: try: task = get() except (IOError, EOFError): debug('result handler got EOFError/IOError -- exiting') return if task is None: debug('result handler ignoring extra sentinel') continue job, i, obj = task try: cache[job]._set(i, obj) except KeyError: pass # blog: http://xiaorui.ccclassThreadPool(Pool):from.dummyimportProcessdef__init__(self,processes=None,initializer=None,initargs=()):Pool.__init__(self,processes,initializer,initargs)defapply_async(self,func,args=(),kwds={},callback=None):'''Asynchronous equivalent of `apply()` builtin'''assertself._state==RUNresult=ApplyResult(self._cache,callback)self._taskqueue.put(([(result._job,None,func,args,kwds)],None))returnresult#取结果classApplyResult(object):def__init__(self,cache,callback):self._cond=threading.Condition(threading.Lock())self._job=job_counter.next()self._cache=cacheself._ready=Falseself._callback=callbackcache[self._job]=selfdefget(self,timeout=None):self.wait(timeout)ifnotself._ready:raiseTimeoutErrorifself._success:returnself._valueelse:raiseself._value#实例化Pool类的时候,就会由一个线程start下面的函数,他的主要任务是写入结果classPool():self._result_handler=threading.Thread(target=Pool._handle_results,args=(self._outqueue,self._quick_get,self._cache))def_handle_results(outqueue,get,cache): thread=threading.current_thread() while1: try: task=get() except(IOError,EOFError): debug('result handler got EOFError/IOError -- exiting') return ifthread._state: assertthread._state==TERMINATE debug('result handler found thread._state=TERMINATE') break iftaskisNone: debug('result handler got sentinel') break job,i,obj=task try: cache[job]._set(i,obj) exceptKeyError: pass whilecacheandthread._state!=TERMINATE: try: task=get() except(IOError,EOFError): debug('result handler got EOFError/IOError -- exiting') return iftaskisNone: debug('result handler ignoring extra sentinel') continue job,i,obj=task try: cache[job]._set(i,obj) exceptKeyError: pass

file:2.7/lib/python2.7/multiprocessing/dummy/__init__.py

Python

Process = DummyProcessclass DummyProcess(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): threading.Thread.__init__(self, group, target, name, args, kwargs) self._pid = None self._children = weakref.WeakKeyDictionary() self._start_called = False self._parent = current_process() Process=DummyProcessclassDummyProcess(threading.Thread):def__init__(self,group=None,target=None,name=None,args=(),kwargs={}):threading.Thread.__init__(self,group,target,name,args,kwargs)self._pid=Noneself._children=weakref.WeakKeyDictionary()self._start_called=Falseself._parent=current_process()

python的multiprocessing模块是处理python多进程的模块,multiprocessing模块中有个dummy的子模块。multiprocessing.dummy对threading多线程编程进行了包装。 话说关于multiprocessing.pool Threading代码看起来不是很流畅。实例化pool的时候,创建你指定的进程数目,或者是cpu的核数。 然后把任务都堆积到task_queue有多个进程去处理。另外我们可以通过ApplyResult的对象获取任务状态及结果。

END.



相关阅读:
Top