Python中的多进程、多线程和协程

时间:2021-04-21 10:40:00 来源:互联网 作者: 神秘的大神 字体:

本文中的内容来自我的笔记。撰写过程中参考了胡俊峰老师《Python程序设计与数据科学导论》课程的内容。

目录
  • 并发处理:多进程和多线程
    • 前置
    • 多进程和多线程的比较
    • 多进程的机制和代码实现
      • 基本用法
      • 进程复制
      • 进程池
        • 进程池的基本用法
        • 进程池中的进程复制
        • 在进程池中利用子进程的返回值
      • 进程间通讯
        • Pipe
        • Queue
    • 多线程
      • 多线程的变量机制
  • 并发处理:协程
    • 用简单的生成器实现协程
    • 用回调函数(callback)将普通函数变为协程
    • 用async/await实现协程
      • 基础使用
      • wait_for()
      • 实现生产者-消费者协程

并发处理:多进程和多线程

前置

概念:

  • 并发:一段时间内同时推进多个任务,但不一定要在一个时刻同时进行多个任务。
  • 并行:一段时间内同时推进多个任务,且在一个时刻要同时进行多个任务。
  • 并行是并发的子集;单核CPU交替执行多个任务是并发但不是并行;多核CPU同时执行多个任务既是并发也是并行。

何时需要并发?

  1. 需要同时处理多个任务
  2. 经常需要等待资源
  3. 多个子过程互相协作

电脑执行任务的机制:

  • 操作系统内核 负责任务(i.e. 进程/线程)的挂起与唤醒,和资源的分配(比如一个程序能访问哪些内存地址)
  • 进程是资源分配的最小单元,不同进程之间不共享资源(比如可访问的内存区域);进程可再分为线程,线程之间共享大部分资源。
    • 正是因为 是否共享资源 上的区别,线程间的切换(即挂起和唤醒)比进程间的切换更快。
    • 线程是调度执行的最小单元。这意味着操作系统内核会负责将多个线程并发执行。

多进程和多线程的比较

多进程:

  • 将任务拆分为多个进程来进行
    • 由内核决定是并行还是仅仅并发。
  • 进程间不共享内存
    • 优点:一个进程挂了不影响别的
    • 缺点:切换进程耗时大、进程间通信不便

多线程:

  • 将任务拆分为一个进程内的多个线程来进行
    • 由内核决定是并行还是仅仅并发。
    • 在CPython解释器中有全局解释器锁,导致多线程只能并发而不能并行(多进程可以并行)。
  • 进程间共享内存
    • 优点:切换耗时低、通信方便
    • 缺点:在并行时对全局变量要使用锁机制
      • 锁机制:一个线程使用一个全局变量时,先等待其(被其他线程)解锁,再将其上锁,再使用,用后再解锁。
        • 如果不使用锁的话:100个a+=1的线程执行完成后(初始a=0),a可能<100
      • 数据科学中可以为了提高效率而不使用锁机制,但同时要容忍由此带来的差错。

多进程的机制和代码实现

以下介绍的函数中,几乎每一个有阻塞可能的,都会有一个可选的timeout参数。这件事将不再重提。

基本用法

from multiprocessing import Process
import os
import time

def task(duration, base_time):
    pid = os.getpid()
    print(f'son process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')
    time.sleep(duration)
    print(f'son process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

if __name__ == '__main__':
    pid = os.getpid()
    base_time = time.perf_counter()
    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
    p1 = Process(target=task, args=(1,base_time)) # a process that executes task(1,base_time); currently not running
    p2 = Process(target=task, args=(2,base_time)) # a process that executes task(2,base_time); currently not running
    p1.start()
    p2.start()
    print(p1.is_alive(), p2.is_alive()) # whether they are running
    print('main process can proceed while son processes are running')
    p1.join() # wait until p1 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes
    p2.join() # wait until p2 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes
    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
main process id 3316 starts at 0.000001s
True True
main process can proceed while son processes are running
son process id 15640 starts at 0.002056s with parameter 1
son process id 10716 starts at 0.003030s with parameter 2
son process id 15640 ends at 1.002352s
son process id 10716 ends at 2.017861s
main process id 3316 ends at 2.114324s

如果没有p1.join()p2.join(),主进程会在很短的时间内完成执行,而此时子进程仍在运行。输出结果如下:

main process id 11564 starts at 0.000001s
True True
main process can proceed while son processes are running
main process id 11564 ends at 0.011759s
son process id 13500 starts at 0.004392s with parameter 1
son process id 11624 starts at 0.003182s with parameter 2
son process id 13500 ends at 1.009420s
son process id 11624 ends at 2.021817s

为何0.004秒的事件在0.003秒的事件之前被输出?

  1. 因为print语句的耗时未被计算在内
  2. 因为perf_counter()在Windows下有bug,它给出的时间在不同进程之间不完全同步

需要注意,一个子进程结束运行后仍然处于存活状态;只有被join()之后才会正式死亡(即被从系统中除名)。

关于if __name__ == '__main__'::

  • 在Python中,可以通过import来获取其他文件中的代码;在文件B的开头(其他位置同理)import文件A,相当于把A在B的开头复制一份。
  • 如果在复制A的内容时,我们希望A中的一部分代码在执行时被忽略(比如说测试语句),就可以给A中的这些代码加上if __name__ == '__main__':
    • 对于从别处import来的代码,系统变量__name__在这段代码中会等于来源文件的名字(或模块名,这你不用在意);对于存在于本文件中的代码,__name__会等于__main__
  • 由于某些原因,在Windows下,如果一个文件的代码中使用了多进程,则这个文件中会隐式地import自己(一次或多次);将所有零级缩进的代码放在if __name__ == '__main__':中,可以避免产生重复执行的问题(注意到如果不这样做的话,import来的副本中还会再次import自身,导致无限递归import并报错)。
    • 暂时可以认为,采取这一措施后就可完全消除“隐式import自身”所产生的效应。

进程复制

from multiprocessing import Process
import os

pid = os.getpid()

def task():
    global pid
    print(pid)
    pid = 1
    print(pid)

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join()
    print(pid)

在Windows下的输出:

4836
1
2944

在Linux下的输出:

511
1
511

前两个数都是由子进程输出,第三个数由父进程输出。

  • 注意到pid在子进程中被赋为1后,在父进程中并不是1。这说明,子进程的target函数中对运行环境的修改,不影响父进程的运行环境。事实上,反之也是成立的(父不影响子)。也就是说,一旦子进程的运行环境完成创建之后,父进程的运行环境与子进程的运行环境之间就完全独立。
    • 由于这个独立性,子进程的运算结果也无法直接反馈给父进程。稍后会介绍两种解决方式:1. 进程间通信 2. 利用 进程池apply方法的返回值。
  • 注意到一三行的输出在Windows下不同,而在Linux下相同。这说明,子进程中全局变量pid的取值,在Linux下是直接复制父进程中pid的取值而得到的,在Windows下是通过重新运行pid = os.getpid()而得到的。更一般地,有以下这两个事实:
    • 在Windows中,Process(target)创建出的子进程是一张白纸(即运行环境空空如也);当调用start()的时候,它会先通过import语句来将父进程的整个代码文件完整执行一遍(从而创建出一个新的运行环境),然后再开始运行target函数。所以,if __name__ == '__main__':包起来的代码,就只会被父进程执行;而未被包起来的零级缩进代码,则也会被每个子进程(在自己的运行环境里)各自执行一遍。
      • 这就是之前提到的“隐式import自身”的机制。
    • 在Linux中,Process(target)创建出的子进程,会全盘复制父进程的运行环境,而不会自己重新创建。复制出来的子进程运行环境,与父进程的运行环境完全独立。

Linux下的进程复制方式称为fork,Windows下的进程复制方式称为spawn。关于这些,详见 https://stackoverflow.com/questions/64095876/multiprocessing-fork-vs-spawn

from multiprocessing import Process
import os

def task():
    pass

if __name__ == '__main__':
    p = Process(target=task)
    print('son process created')
    p.start()
    print('son process starts')
    p.join()
    print('son process ends')

print('gu?')

在Windows下的输出

son process created
son process starts
gu?
son process ends
gu?

由此可见,Windows下子进程(在初始化时)在执行父进程的代码文件时,父进程中son_process.start()以后的内容(比如print('gu?'))也会被执行。

进程池

如果我们有很多的任务要同时进行,为每个任务各开一个进程既低效(创建和销毁进程开销大、无法全部并行、内核难以调度)又可能不被内核允许。

解决方案:使用进程池,池中放着几个进程(一般不会比CPU的核数多很多),有新任务时找一个空闲进程分配给它,没有空闲进程则等待。缺点是没有空闲进程时需要等待,因此不能算是完全的并发。

进程池的基本用法
from multiprocessing import Pool
import os, time

def task(duration, base_time, task_name):
    pid = os.getpid()
    print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')
    time.sleep(duration)
    print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')

if __name__ == '__main__':
    pid = os.getpid()
    base_time = time.perf_counter()
    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
    pool = Pool(3) # a pool containing 3 subprocesses
    print('start assigning tasks')
    for i in range(4):
        pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1))) # assign task to some process in pool and start running
                                                                        # if all son processes are busy, wait until one is free and then start
    pool.close() # no longer accepting new tasks, but already applied ones (including those that are waiting) keeps running.
    print('all tasks assigned; wait for son processes to finish')
    pool.join() # wait until all tasks are done, and then the pool is dead. `join()` can be called only if `close()` has already been called
    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

输出:(Win和Linux下输出相似)

main process id 5236 starts at 0.000002s
start assigning tasks
all tasks assigned; wait for son processes to finish
son process id 8724 starts working on TaskNo.1 at 0.030557s with parameter 1
son process id 14584 starts working on TaskNo.2 at 0.037581s with parameter 1
son process id 10028 starts working on TaskNo.3 at 0.041210s with parameter 1
son process id 14584 ends working on TaskNo.2 at 1.042662s
son process id 8724 ends working on TaskNo.1 at 1.040211s
son process id 14584 starts working on TaskNo.4 at 1.044109s with parameter 1
son process id 10028 ends working on TaskNo.3 at 1.054017s
son process id 14584 ends working on TaskNo.4 at 2.055515s
all tasks finished at 2.214534s
main process id 5236 ends at 2.214884s

当使用apply_async(“异步调用”)添加任务时,主进程在子进程执行任务期间会继续运行;如果用apply(“同步调用”)添加任务,则主进程会暂停(“阻塞”)直到该任务完成。一般使用apply_async而不是apply

进程池中的进程复制
from multiprocessing import Pool
import os, time

all_tasks_on_this_son_process = []

def task(duration, base_time, task_name):
    global all_tasks_on_this_son_process
    pid = os.getpid()
    print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)
    time.sleep(duration)
    print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
    all_tasks_on_this_son_process += [task_name]

if __name__ == '__main__':
    pid = os.getpid()
    base_time = time.perf_counter()
    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
    pool = Pool(3)
    print('start assigning tasks')
    for i in range(4):
        pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))
    pool.close()
    print('all tasks assigned; wait for son processes to finish')
    pool.join()
    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

print('gu?')

Windows下输出:

main process id 6116 starts at 0.000001s
start assigning tasks
all tasks assigned; wait for son processes to finish
gu?
gu?
gu?
son process id 16028 starts working on TaskNo.1 at 0.037577s with parameter 1, this process already executed []
son process id 11696 starts working on TaskNo.2 at 0.041393s with parameter 1, this process already executed []
son process id 5400 starts working on TaskNo.3 at 0.038409s with parameter 1, this process already executed []
son process id 11696 ends working on TaskNo.2 at 1.041521s
son process id 16028 ends working on TaskNo.1 at 1.038722s
son process id 11696 starts working on TaskNo.4 at 1.042543s with parameter 1, this process already executed ['TaskNo.2']
son process id 5400 ends working on TaskNo.3 at 1.052573s
son process id 11696 ends working on TaskNo.4 at 2.053483s
all tasks finished at 2.167447s
main process id 6116 ends at 2.167904s
gu?

在Windows下,池中的每个线程会在(且仅在)它分配到的的第一个任务将要开始执行时,运行一遍父进程的代码以构建运行环境。一个进程在前一个任务中对运行环境的改变,原样体现在下一个任务的运行环境里。(即接受新任务的时候会直接继续使用上一个任务遗留下的运行环境)

Linux下输出:

main process id 691 starts at 0.000001s
all tasks assigned; wait for son processes to finish
son process id 692 starts working on TaskNo.1 at 0.104757s with parameter 1, this process already executed []
son process id 693 starts working on TaskNo.2 at 0.104879s with parameter 1, this process already executed []
son process id 694 starts working on TaskNo.3 at 0.105440s with parameter 1, this process already executed []
son process id 692 ends working on TaskNo.1 at 1.106427s
son process id 693 ends working on TaskNo.2 at 1.106426s
son process id 694 ends working on TaskNo.3 at 1.107157s
son process id 692 starts working on TaskNo.4 at 1.107560s with parameter 1, this process already executed ['TaskNo.1']
son process id 692 ends working on TaskNo.4 at 2.110033s
all tasks finished at 2.117158s
main process id 691 ends at 2.117452s
gu?

在Linux下,池中的每个线程会在(且仅在)它的第一个任务将要开始执行时,从父进程将运行环境完整复制一遍。一个进程在前一个任务中对运行环境的改变,原样体现在下一个任务的运行环境里。(即接受新任务的时候会直接继续使用上一个任务遗留下的运行环境)

from multiprocessing import Pool
import os, time

all_tasks_on_this_son_process = []

def init(init_name):
    global all_tasks_on_this_son_process
    all_tasks_on_this_son_process += [init_name]

def task(duration, base_time, task_name):
    global all_tasks_on_this_son_process
    pid = os.getpid()
    print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)
    time.sleep(duration)
    print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
    all_tasks_on_this_son_process += [task_name]

if __name__ == '__main__':
    pid = os.getpid()
    base_time = time.perf_counter()
    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
    pool = Pool(3, initializer=init, initargs=('init',)) # look here
    print('start assigning tasks')
    for i in range(4):
        pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))
    pool.close()
    print('all tasks assigned; wait for son processes to finish')
    pool.join()
    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

输出(Win下与Linux下相似):

main process id 18416 starts at 0.000004s
start assigning tasks
all tasks assigned; wait for son processes to finish
son process id 10052 starts working on TaskNo.1 at 0.053483s with parameter 1, this process already executed ['init']
son process id 17548 starts working on TaskNo.2 at 0.040412s with parameter 1, this process already executed ['init']
son process id 10124 starts working on TaskNo.3 at 0.049992s with parameter 1, this process already executed ['init']
son process id 10124 ends working on TaskNo.3 at 1.054387s
son process id 17548 ends working on TaskNo.2 at 1.044956s
son process id 10052 ends working on TaskNo.1 at 1.062396s
son process id 10124 starts working on TaskNo.4 at 1.055888s with parameter 1, this process already executed ['init', 'TaskNo.3']      
son process id 10124 ends working on TaskNo.4 at 2.060094s
all tasks finished at 2.443017s
main process id 18416 ends at 2.444705s
在进程池中利用子进程的返回值
from multiprocessing import Pool
import time

def task(duration, base_time, task_name):
    time.sleep(duration)
    return f'{task_name} finished at {"%.6f" % (time.perf_counter()-base_time)}s'

if __name__ == '__main__':
    base_time = time.perf_counter()
    pool = Pool(2)
    return_values = []
    return_values.append(pool.apply(task, args=(1,base_time,'TaskNo.1_sync')))
    print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))
    return_values.append(pool.apply_async(task, args=(2,base_time,'TaskNo.2_async')))
    print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))
    pool.close()
    pool.join()
    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
    assert return_values[1].ready() == True
    return_values[1] = return_values[1].get() # from ApplyResult to true return value
    print('results:', return_values)
at time 1.2109459, r_v is ['TaskNo.1_sync finished at 1.027223s']
at time 1.2124976, r_v is ['TaskNo.1_sync finished at 1.027223s', <multiprocessing.pool.ApplyResult object at 0x0000016D24D79AC0>]     
all tasks finished at 3.258190s
results: ['TaskNo.1_sync finished at 1.027223s', 'TaskNo.2_async finished at 3.041053s']

这里在pool.join()之后调用result.get(),所以可以立刻得到 子进程所执行的函数的返回值;如果在对应的子进程尚未return时就调用result.get(),则主进程会阻塞直到子进程返回,然后获取子进程所执行的函数的返回值。result.ready()返回一个bool,表示对应的子进程是否已经return

此外,result.wait()会阻塞直到子进程返回,但不会获取返回值。

一个ApplyResult实例可以多次调用get(),即可以多次获取返回值。

详见 https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult

进程间通讯

可以认为,任何一个被跨进程传送的对象,在传送过程中都会被深拷贝。

Pipe
from multiprocessing import Process, Pipe
import time

def send_through_pipe(conn, pipe_name, sender_name, content, base_time):
    print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    conn.send(content)
    print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))
    
def receive_from_pipe(conn, pipe_name, receiver_name, base_time):
    print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    content = conn.recv()
    print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))
    return content

def task(conn, pipe_name, process_name, base_time):
    receive_from_pipe(conn, pipe_name, process_name, base_time)
    time.sleep(1)
    send_through_pipe(conn, pipe_name, process_name, 142857, base_time)

if __name__ == '__main__':
    base_time = time.perf_counter()
    conn_A, conn_B = Pipe() # two endpoints of the pipe
    p1 = Process(target=task, args=(conn_B,'pipe','son',base_time))
    p1.start()
    
    time.sleep(1)
    send_through_pipe(conn_A, 'pipe', 'main', ['hello','hello','hi'], base_time) # any object can be sent
    receive_from_pipe(conn_A, 'pipe', 'main', base_time)
    p1.join()
son tries to receive content from pipe at 0.036439
main tries to send ['hello', 'hello', 'hi'] through pipe at 1.035570
main successfully finishes sending at 1.037174
main tries to receive content from pipe at 1.037318
son successfully receives ['hello', 'hello', 'hi'] at 1.037794
son tries to send 142857 through pipe at 2.039058
son successfully finishes sending at 2.040158
main successfully receives 142857 at 2.040441

另外,还可以用conn.poll()(返回Bool类型)来获知conn中是否有对面发来的未读信息。

from multiprocessing import Process, Pipe
import time

def send_through_pipe(conn, pipe_name, sender_name, content, base_time):
    print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    conn.send(content)
    print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))
    
def receive_from_pipe(conn, pipe_name, receiver_name, base_time):
    print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    content = conn.recv()
    print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))
    return content

def task1(conn, pipe_name, process_name, base_time):
    receive_from_pipe(conn, pipe_name, process_name, base_time)
    time.sleep(1)
    send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)

def task2(conn, pipe_name, process_name, base_time):
    time.sleep(1)
    send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)
    time.sleep(2)
    receive_from_pipe(conn, pipe_name, process_name, base_time)

if __name__ == '__main__':
    base_time = time.perf_counter()
    conn_A, conn_B = Pipe()
    p1 = Process(target=task1, args=(conn_A,'pipe','son1',base_time))
    p2 = Process(target=task2, args=(conn_B,'pipe','son2',base_time))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
son1 tries to receive content from pipe at 0.033372
son2 tries to send greetings from son2 through pipe at 1.058998
son2 successfully finishes sending at 1.060660
son1 successfully receives greetings from son2 at 1.061171
son1 tries to send greetings from son1 through pipe at 2.062389
son1 successfully finishes sending at 2.063290
son2 tries to receive content from pipe at 3.061378
son2 successfully receives greetings from son1 at 3.061843

由此可见:

  • Pipe可以暂存数据,而且其暂存的数据符合FIFO规则。
    • 但是,Pipe用于暂存数据的区域大小比较有限(具体大小随OS而定),如果这个区域满了,send()就会被阻塞,直到对面用recv()腾出位置为止。
  • Pipe的两个端点可以分配给任意两个进程。
    • 不建议把同一个端点分配给多个进程,这可能会带来风险;如果确实需要的话,请使用Queue
Queue

本质上是一个能够跨进程运行的队列。

Queue的操作的时间开销约为Pipe中对应操作的两倍。

from multiprocessing import Process, Queue
import time

def put_into_queue(q, queue_name, putter_name, content, base_time):
    print(putter_name, 'tries to put', content, 'into', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    q.put(content)
    print(putter_name, 'successfully finishes putting at', '%.6f'%(time.perf_counter()-base_time))
    
def get_from_queue(q, queue_name, getter_name, base_time):
    print(getter_name, 'tries to receive content from', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    content = q.get()
    print(getter_name, 'successfully gets', content, 'at', '%.6f'%(time.perf_counter()-base_time))
    return content

def task1(q, delay, queue_name, process_name, base_time):
    time.sleep(delay)
    put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)
    time.sleep(5)
    get_from_queue(q, queue_name, process_name, base_time)

def task2(q, delay, queue_name, process_name, base_time):
    time.sleep(delay)
    get_from_queue(q, queue_name, process_name, base_time)
    time.sleep(5)
    put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)

if __name__ == '__main__':
    base_time = time.perf_counter()
    q = Queue()
    put_and_get_1 = Process(target=task1, args=(q,0,'queue','putAndGet_No.1',base_time))
    get_and_put_1 = Process(target=task2, args=(q,1,'queue','getAndPut_No.1',base_time))
    get_and_put_2 = Process(target=task2, args=(q,2,'queue','getAndPut_No.2',base_time))
    put_and_get_1.start()
    get_and_put_1.start()
    get_and_put_2.start()
    put_and_get_1.join()
    get_and_put_1.join()
    get_and_put_2.join()
putAndGet_No.1 tries to put christmas card from putAndGet_No.1 into queue at 0.077883
putAndGet_No.1 successfully finishes putting at 0.079291
getAndPut_No.1 tries to receive content from queue at 1.104196
getAndPut_No.1 successfully gets christmas card from putAndGet_No.1 at 1.105489
getAndPut_No.2 tries to receive content from queue at 2.126434
putAndGet_No.1 tries to receive content from queue at 5.081044
getAndPut_No.1 tries to put christmas card from getAndPut_No.1 into queue at 6.106381
getAndPut_No.1 successfully finishes putting at 6.107820
getAndPut_No.2 successfully gets christmas card from getAndPut_No.1 at 6.108565
getAndPut_No.2 tries to put christmas card from getAndPut_No.2 into queue at 11.109579
getAndPut_No.2 successfully finishes putting at 11.112493
putAndGet_No.1 successfully gets christmas card from getAndPut_No.2 at 11.113546

另外,如果Queue的大小实在过大以至于达到了某个上限,则put()操作也会被阻塞。不过应该很难把大小弄到那么大。

多线程

基本语法和多进程很相似,但机制上有重要的不同。由于全局解释器锁的存在,Python多线程并不实用,这里仅作简单介绍。

从下图中可以看到,多线程的基本代码和多进程完全一致。下图中的代码在CPython解释器中会运行大约3s。

另外,多线程中其实不需要这个if __name__ == '__main__':的判断。

多线程的变量机制

import threading

lock_n = threading.Lock()
n = 0

def inc_n(m):
    global n
    lock_n.acquire(blocking=True)
    n += m
    lock_n.release()

threads = [threading.Thread(target=inc_n, args=(i,)) for i in range(1,11)]
[t.start() for t in threads]
[t.join() for t in threads]

print(n)
55
  • 由上可见,不同的线程之间共享运行环境(比如上面的变量n)。
  • lock.acquire(blocking=True) 会一直阻塞直到锁空出来为止;一旦空出来就会把它锁上。

并发处理:协程

不同的过程在同一个线程内交替执行。每个协程在运行时独占资源,一段运行结束后自阻塞,等待着被外部(如main函数)控制它的代码唤醒。

相比多线程的优点:轻量级(在解释器层面实现,不需要内核来做切换)、数量不限。

和多线程一样,不同协程之间共用运行环境。

用简单的生成器实现协程

def sum(init):
    s = init
    while True:
        delta = yield s # output s, and then input delta
        s += delta
        
g = sum(0)
print(next(g)) # run entil receiving the first output
print(g.send(1)) # send the first input, and then get the second output
print(g.send(2)) # send the second input, and then get the third output
0
1
3

上例中只是演示了生成器的自阻塞,以及生成器与其调用者之间的交互。

更进一步,还可以定义多个生成器执行不同的过程,并在main函数里进行对它们的调度(比如实现一个任务队列),从而实现协程。

用回调函数(callback)将普通函数变为协程

def calc(n,callback):
    r = 0
    for i in range(n):
        r += i
        callback()

def pause():
    print('pause')
    yield # just pause, do not return anything

g = calc(10,pause)

async/await实现协程

相比生成器实现的优点:可以在等待IO/等待网络通信等情况下时阻塞当前协程执行其他协程(而且不会中断等待IO/通信)以节省时间(而只用生成器则无法做到);使用更灵活、方便。

  • 多线程其实也有前一个优点。所以CPython下的多线程也并不是毫无用处,但它的用处是协程用处的子集。
  • 一个注意事项:若想通过协程加速IO,必须使用python中专门的异步IO库才行。

基础使用

import time

start = time.perf_counter()

def sayhi(delay):
    time.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

def main():
    sayhi(1)
    sayhi(2)

main()
hi! at 1.0040732999914326
hi! at 3.015253899997333
import time
import asyncio

start = time.perf_counter()

async def sayhi(delay):
    await asyncio.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

async def main():
    sayhi1 = asyncio.create_task(sayhi(1))
    sayhi2 = asyncio.create_task(sayhi(2))
    await sayhi1
    await sayhi2

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0037910000100965
hi! at 2.0026504999987083

上面的程序中:

  • async 声明当前函数是一个协程。这一声明使得函数体内可以使用create_taskawait,也使得该函数本身可以被create_taskawait
    • 一旦一个函数 f 被声明为协程,f()做的事就不再是运行 f,而只是创建一个协程对象并返回之(这个对象并不会自动被运行)。需要使用 asyncio 中的相关工具来运行这个对象。

  • run(main()) 表示开始执行协程 main() 。要求 main() 必须是“主协程”,即它是整个程序中所有协程的入口(类似主函数)。一个程序中 run(main()) 只应被调用一次,且在 main() 之外不应有任何协程被调用。
    • run(main()) 是阻塞的。协程的并发特性只有在main()内部才会显现,从外部来看这就是一个普普通通的黑箱调用。
    • run()的作用是启动 运行协程所需的环境(并在main()完成后将其关闭)。但在IPython中,一开始运行就已经自动帮你启动好了,所以可以直接用await(而且也不必把所有协程都放在一个主协程中,而可以散布在程序各处)。

  • create_task(sayhi(1)) 表示为协程sayhi(1)在某个“任务池”中创建一个任务,并且开始执行该任务。返回值是这个任务的handle,或者说“遥控器”。

    • 任务池中的任务会并发地执行。任务在何时可以中断并切换到别的任务,这一点由await指定。
  • await sayhi1 有两重含义:

      1. 阻塞当前协程(该语句所在的协程,这里是main())的执行,直到任务sayhi1完成。(类似Process.join()
      1. 告诉解释器,现在当前协程(该语句所在的协程)开始阻塞,你可以切换协程了。
      • 如果这里await的不是sayhi1而是,比如说,一个接受http请求的操作,那么在解释器切换协程后不会影响对这个请求的等待。这就是asyncio的强大之处。
        • 这一点在await asyncio.sleep(delay)就有体现。asyncio.sleep()就具有“切换协程不影响等待”的特性。
  • 关于await的几件事:

    • await的可以不是已创建的任务而是一个协程对象(比如await sayhi(1)),此时不会将其加入任务池,而会直接开始执行(当然,也可能刚开始执行就被切换到别的协程,因为用了await),并一直阻塞直到完成。这会导致sayhi(1)无法作为一个任务、与其他任务平等地参与并发,但是它仍然可以随着父协程(这里是main())的中断和恢复而间接地参与并发。
    • 能够被await的不只有协程对象和任务handle,还有任何awaitable object,即任何实现了__await__方法(从而告诉了解释器如何在自己刚开始执行时就阻塞并切换协程,且不影响内部可能在进行的等待和其他操作)的对象。
    • await 的对象只可能在刚开始执行时立刻阻塞并切换协程。执行过程中其他可以阻塞的位置,是由这个对象内部使用的其他await语句指定的,而不是调用这个对象的那条await语句。
import time
import asyncio

start = time.perf_counter()

async def sayhi(delay):
    await asyncio.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

async def main():
    await sayhi(1)
    await sayhi(2)

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0072715999995125
hi! at 3.0168006000021705

wait_for()

await AA为任意awaitable)改成await asyncio.wait_for(A,timeout),就可以给await操作加上timeout秒的时限,一旦await了这么多秒还没有结束,就会中断A的执行并抛出asyncio.TimeoutError

不用关心wait_for具体做了什么,你只需要记住await asyncio.wait_for(A,timeout)这个句子就行。可以认为这个句子和await A在(除了timeout以外的)其他方面上没有区别。下面是例子。

import time
import asyncio


async def eternity():
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
timeout!
import time
import asyncio

start = time.perf_counter()

async def sayhi(delay):
    await asyncio.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

async def main():
    sayhi1 = asyncio.create_task(sayhi(1))
    sayhi2 = asyncio.create_task(sayhi(2))
    await asyncio.wait_for(sayhi1,1.05)
    await asyncio.wait_for(sayhi2,1.05)

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0181081000046106
hi! at 2.0045300999918254
import time
import asyncio

start = time.perf_counter()

async def sayhi(delay):
    await asyncio.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

async def main():
    sayhi1 = asyncio.create_task(sayhi(1))
    sayhi2 = asyncio.create_task(sayhi(2))
    await asyncio.wait_for(sayhi1,0.95)
    await asyncio.wait_for(sayhi2,1.05)

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
---------------------------------------------------------------------------

TimeoutError                              Traceback (most recent call last)

<ipython-input-89-7f639d54114e> in <module>
     15 
     16 # asyncio.run(main()) # use outside IPython
---> 17 await main() # use inside IPython


<ipython-input-89-7f639d54114e> in main()
     11     sayhi1 = asyncio.create_task(sayhi(1))
     12     sayhi2 = asyncio.create_task(sayhi(2))
---> 13     await asyncio.wait_for(sayhi1,0.95)
     14     await asyncio.wait_for(sayhi2,1.05)
     15 


~\anaconda3\lib\asyncio\tasks.py in wait_for(fut, timeout, loop)
    488             # See https://bugs.python.org/issue32751
    489             await _cancel_and_wait(fut, loop=loop)
--> 490             raise exceptions.TimeoutError()
    491     finally:
    492         timeout_handle.cancel()


TimeoutError: 


hi! at 2.0194762000028277

另外,注意到即使协程sayhi1抛出了异常,父协程main()仍然能够继续执行sayhi2。可见不同协程间是有一定的独立性的。

实现生产者-消费者协程

为此需要使用 asyncio.Queue 。它相比普通的队列的区别是,其put/get操作会在无法执行时阻塞(这一点和multiprocessing.Queue很像),而且这些操作都是协程(注意到,这使得你调用它们时只会返回协程对象而不会实际执行),可以await

import time
import asyncio

start = time.perf_counter()

async def producer(q):
    for i in range(5):
        await asyncio.sleep(1) # producing takes 1 sec
        await q.put(i) # will wait if q is full
        print(f'put {i} at {time.perf_counter() - start}')
    
    await q.join() # will wait until all objects produced are **taken out** and **consumed**.

async def consumer(q):
    for i in range(5):
        item = await q.get() # will wait if q is empty. BTW we see that "await XXX" is an expression not a command.
        print(f'get {item} at {time.perf_counter() - start}')
        await asyncio.sleep(1) # consuming takes 1 sec
        q.task_done() # tells the queue that [the object just taken out] has been consumed. just taking out is not enough!
        print(f'consumed {item} at {time.perf_counter() - start}')
    
async def main():
    q = asyncio.Queue()
    P = asyncio.create_task(producer(q))
    C = asyncio.create_task(consumer(q))
    await P
    await C
    print(f'done at {time.perf_counter() - start}')

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
put 0 at 1.0108397000003606
get 0 at 1.0112231999955839
put 1 at 2.017216499996721
consumed 0 at 2.0176210000063293
get 1 at 2.0177472999930615
put 2 at 3.0279211000015493
consumed 1 at 3.0283254999958444
get 2 at 3.028457599997637
put 3 at 4.039952199993422
consumed 2 at 4.041183299996192
get 3 at 4.041302300000098
put 4 at 5.0465819999953965
consumed 3 at 5.04690839999239
get 4 at 5.047016099997563
consumed 4 at 6.047789799995371
done at 6.048323099996196
import time
import asyncio

start = time.perf_counter()

async def sleep_and_put(q):
    await asyncio.sleep(1)
    await q.put(1)

async def main():
    q = asyncio.Queue()
    C = asyncio.create_task(q.get())
    P = asyncio.create_task(sleep_and_put(q))
    await C
    await P
    print(f'finished at {time.perf_counter() - start}')

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
finished at 1.01112650000141

由上例可见,Queue.get()(其实Queue.put()等其他方法也一样)是一个协程,因此也可以给它创建任务以进行并发。