摘要:對線程池的研究是之前對分析的附加工作。在之前對源碼分析的文章中,寫到調度器將任務放入線程池的函數這里分析的線程池類是,也就是上述代碼中所使用的類。
對Python線程池的研究是之前對Apshceduler分析的附加工作。
在之前對Apshceduler源碼分析的文章中,寫到調度器將任務放入線程池的函數
def _do_submit_job(self, job, run_times): def callback(f): exc, tb = (f.exception_info() if hasattr(f, "exception_info") else (f.exception(), getattr(f.exception(), "__traceback__", None))) if exc: self._run_job_error(job.id, exc, tb) else: self._run_job_success(job.id, f.result()) f = self._pool.submit(_run_job, job, job._jobstore_alias, run_times, self._logger.name) f.add_done_callback(callback)
這里分析的線程池類是concurrent.futures.ThreadPoolExecutor,也就是上述代碼中self._pool所使用的類。先上self._pool.submit函數的代碼,再做詳細分析
def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError("cannot schedule new futures after shutdown") f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f
f和w是兩個非常重要的變量,f作為submit返回的對象,submit函數的調用者可以對其添加回調,待fn執行完成后,會在當前線程執行,具體是如何實現的,這里先不說,下面再詳細分析;w則是封裝了線程需要執行的方法和參數,通過self._work_queue.put(w)方法放入一個隊列當中。
self._adjust_thread_count()方法則是檢查當前線程池的線程數量,如果小于設定的最大值,就開辟一個線程,代碼就不上了,直接看這些個線程都是干嘛的
def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical("Exception in worker", exc_info=True)
這些線程就是一個死循環,不斷的從任務隊列中獲取到_WorkItem,然后通過其封裝方法,執行我們需要的任務。如果取到的任務為None,就往隊列中再放入一個None,以通知其它線程結束,然后結束當前循環。
def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as e: self.future.set_exception(e) else: self.future.set_result(result)
如果沒有異常,執行結束后,會執行之前我們說的回調。在self.future.set_result(result)方法中會執行任務回調,當然了,是在當前線程中。如果需要寫入數據庫之類的操作,不建議在回調中直接寫入。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/40997.html
摘要:說多了都是淚,我之前排查內存泄漏的問題,超高并發的程序跑了個月后就崩潰。以前寫中間件的時候,就總是把用戶當,要盡量考慮各種情況避免內存泄漏。 從 Java 到 Python 本文為我和同事的共同研究成果 當跨語言的時候,有些東西在一門語言中很常見,但到了另一門語言中可能會很少見。 例如 C# 中,經常會關注拆箱裝箱,但到了 Java 中卻發現,根本沒人關注這個。 后來才知道,原來是因為...
摘要:獲取正在運行的線程數,用于狀態監控。之后初始化組件主要是初始化線程池將到中,初始化開始時間等。如果線程池中運行線程數量為,并且默認,那么就停止退出,結束爬蟲。 本系列文章,針對Webmagic 0.6.1版本 一個普通爬蟲啟動代碼 public static void main(String[] args) { Spider.create(new GithubRepoPageP...
摘要:開頭正式開啟我入職的里程,現在已是工作了一個星期了,這個星期算是我入職的過渡期,算是知道了學校生活和工作的差距了,總之,盡快習慣這種生活吧。當時是看的廖雪峰的博客自己也用做爬蟲寫過幾篇博客,不過有些是在前人的基礎上寫的。 showImg(https://segmentfault.com/img/remote/1460000010867984); 開頭 2017.08.21 正式開啟我...
閱讀 3718·2021-11-25 09:43
閱讀 2606·2021-11-18 13:11
閱讀 2220·2019-08-30 15:55
閱讀 3277·2019-08-26 11:58
閱讀 2831·2019-08-26 10:47
閱讀 2235·2019-08-26 10:20
閱讀 1278·2019-08-23 17:59
閱讀 3014·2019-08-23 15:54