时间:2021-03-05 23:49:00 来源:互联网 作者: 神秘的大神 字体:
Python进行异步执行的库有threading(多线程)和multiprocessing(多进程),这两个库为程序提供了丰富的异步操作,但是如果只是进行一些简单的异步执行,并不需要用到多复杂的场景,可以考虑使用concurrent.confutures。它提供一些简单常用的异步执行操作,比如submit和map方法,并且在异步执行完后还可以获取执行对象的返回结果。
这是一个抽象基类,后面会介绍的用于线程异步执行的ThreadPoolExecutor类和用于进程异步执行的ProcessPoolExecutor类都是Executor的子类,Executor抽象类定义了一些常用的异步操作,在进行多线程操作和多进程操作的时候都可以使用这些方法。
submit(fn, *args, **kwargs):调用可执行对象fn并以fn(*args, **kwargs)的方式执行,返回一个Future对象。
示例:
# max_workers用于指定pool中最大工作对象数量(线程或进程)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result()) # result方法可以获取fn的执行结果
map(func, *iterables, timeout=None, chunksize=1):这个函数和Python内置的map函数非常相似,但是有以下两点区别:
参数说明:
__next__()
已经被调用,并且Executor.map()
在timeout指定秒数后返回的结果不可用,则会抛出concurrent.futures.TimeoutError
。示例:
from concurrent.futures import ThreadPoolExecutor
def func(a):
return 6 / a
with ThreadPoolExecutor() as executor:
result = executor.map(func, [3, 2])
for res in result:
print(res)
'''
打印结果:
2.0
3.0
'''
shutdown(wait=True, *, cancel_futures=False):当future对象执行完成后向executor发送一个信号,释放正在使用的所有资源。但是这个函数之后再调用Executor.submit()和Executor.map()的话,则会抛出RuntimeError。
参数说明:
ThreadPoolExecutor是抽象类Executor的子类,使用一个线程池来执行所有的异步调用。注意,当一个future中调用了另一个future时会导致死锁。
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
示例:(来自官方文档)
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
# as_completed返回一个包含future对象的迭代器,但future对象只有执行完毕或被取消才能返回
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor是抽象Executor的一个子类,使用一个进程池来执行所有的异步调用。它会使用multiprocessing模块,并且会绕过GIL(全局解释锁),但这也意味着只能执行和返回可picklable的对象。注意:在子进程中再次调用另外的Executor对象或Future对象将会导致死锁。
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
示例:(来自官方文档)
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
concurrent.futures.Future:Future对象是对异步执行的封装,由Executor.submit()创建。
方法说明:
以下的一些方法只是用于单元测试和Executor的实现:
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待由fs指定的所有future对象(可能是由不同的Executor创建)执行完毕。返回一个由集合构成的二元组,第一个集合为done,包含了所有完成或者已被取消的future对象,第二个集合为not_done,包含了所有的待定或者正在运行的future对象。
concurrent.futures.as_completed(fs, timeout=None)
返回一个由fs指定的future实例的迭代器,当这些future实例执行完毕或者被取消时就会生成对应的future对象。fs指定的future实例如果存在重复的,则这些重复的实例只会返回一次。并且在调用该方法前就执行完成的future实例会被优先返回。如果在timeout指定的时间内还有future没有开始执行(调用__next__()
方法),则会抛出concurrent.futures.TimeoutError
,如果timeout没有指定,则会一直等待下去。
concurrent.futures.CancelledError:当一个future对象被取消的时候抛出。
concurrent.futures.TimeoutError:当一个future对象的执行时间超出了timeout指定的时间时抛出。
concurrent.futures.BrokenExecutor:派生自RuntimeError,当executor因某些原因被中断,导致不能submit或者不能执行一个新的任务时会抛出。
concurrent.futures.InvalidStateError:当某个操作在一个当前状态不允许的future上执行时抛出。
concurrent.futures.thread.BrokenThreadPool:派生自BrokenExecutor,当ThreadPoolExecutor中的一个executor初始化失败时抛出。
concurrent.futures.process.BrokenProcessPool:派生自BrokenExecutor(以前的名称为RuntimeError),当ProcessPoolExecutor中一个executor被非正常终止时(如被外部操作杀死时)抛出。
本文大多是从官方文档直接翻过来的:https://docs.python.org/3/library/concurrent.futures.html
发布此文章仅为传递网友分享,不代表本站观点,若侵权请联系我们删除,本站将不对此承担任何责任。