生产者消费者模型

时间:2019-07-03 16:45:00 来源:互联网 作者: 神秘的大神 字体:

生产者消费者模型 *****

是什么

模型: 就是解决某个问题套路
生产者: 指的是产生数据的一方 (一段代码)
消费者: 指的是处理数据的一方 (一段代码)

生活中到处都是这种模型

例如:饭店 厨师就是生产者 吃饭的人就是消费者

例如: 先爬取网页数据(生产) 在解析网页数据 (消费)

生产者和消费者出啥问题了?

# 消费任务
def eat(food):
    for i in range(10):
        # 要消费
        time.sleep(random.randint(0, 2))
        print(food,"吃完了!")

# 生产任务
def make_rose():
    for i in range(10):
        # 再生产
        time.sleep(random.randint(0, 2))
        print("第%s盘青椒肉丝制作完成!" % i)
        rose = "第%s盘青椒肉丝" % i
                eat(rose) # 直接调用消费任务

# 开启任务 
make_rose()

生产者和消费,处理速度不平衡,一方快一方慢,导致一方需要等待另一方 整体效率低下

生产者消费者模型解决这个问题的思路:

原本,双方是耦合 在一起,消费必须等待生产者生成完毕在开始处理, 反过来

如果消费消费速度太慢,生产者必须等待其处理完毕才能开始生成下一个数据

解决的方案:

1.将双方分开来.一方专门负责生成,一方专门负责处理

这样一来数据就不能直接传递了 因为消费者可能还没有处理完成,为了使生产者可以不断的生成,则需要一个共同的容器

2.生产者完成后放入容器,消费者从容器中取出数据

这样就解决了双方能力不平衡的问题,做的快的一方可以继续做,不需要等待另一方

案例:

def eat(q):
    for i in range(10):
        # 要消费
        rose = q.get()
        time.sleep(random.randint(0, 2))
        print(rose,"吃完了!")

# 生产任务
def make_rose(q):
    for i in range(10):
        # 再生产
        time.sleep(random.randint(0, 2))
        print("第%s盘青椒肉丝制作完成!" % i)
        rose = "第%s盘青椒肉丝" % i
        # 将生成完成的数据放入队列中
        q.put(rose)

if __name__ == '__main__':
    # 创建一个共享队列
    q = Queue()
    make_p = Process(target=make_rose,args=(q,))
    eat_p =  Process(target=eat,args=(q,))


    make_p.start()
    eat_p.start()

joinableQueue

可翻译:为可join的队列

该队列相比普通的Queue的区别在于该对列额外增加的了join函数

join函数的作用:

​ 该函数为阻塞函数,会阻塞直到等待队列中所有数据都被处理完毕。

q = JoinableQueue()
q.put(1) 
q.get()
q.join() #阻塞 等待队列中所有数据都被处理完毕
print("over")

执行以上函数,将导致进程无法结束,注释掉join调用就正常,发现join的确有阻塞的效果,

但是队列中一共就一个数据,明明已经调用get取出了,为什么join依然阻塞呢?

这是因为get仅仅是取出数据,而join是等待数据处理完毕,也就是说:

取出数据还不算完,你处理完以后必须告知队列处理完毕,通过task_done

q = JoinableQueue()
q.put(1) 

q.get()
q.task_done() # 数据处理完毕

q.join() #阻塞 等待队列中所有数据都被处理完毕
print("over")
#输出:
#   over

需要注意的时,task_done的调用次数必大于等于队列中的数据个数,join才能正常结束阻塞

q = JoinableQueue()
q.put(1) 
q.put(1) 

q.get()
q.task_done() # 数据处理完毕


q.join() #阻塞 等待队列中所有数据都被处理完毕
print("over")
#输出:
#   over

总结:

主进程可以明确知道队列中的数据何时被处理完毕

守护进程与joinablequeue的应用

回顾之前的生产者消费者模型中,生产者与消费者都明确要处理的数据数量,但是实际开发中很多情况是无法提前明确的,例如:要爬去一个网站上的所有页面,页面数量数不固定的

from multiprocessing import Process,JoinableQueue,Queue
import  time,random
def producter(name,q):
    for i in range(5):
        time.sleep(random.randint(1,2))
        print("\033[46m%s生产了 热狗%s\033[0m" % (name,i))
        q.put("%s的 热狗%s" % (name,i))


def customer(name,q):
    while True:
        time.sleep(random.randint(1, 2))
        hot_dog = q.get()
        print("\033[47m%s 吃掉了 %s \033[0m" % (name,hot_dog))

if __name__ == '__main__':

    q = Queue()

    p1 = Process(target=producter,args=("北京*店",q))
    p2 = Process(target=producter,args=("上海*店",q))
    p3 = Process(target=producter, args=("深圳*店", q))
    p1.start()
    p2.start()
    p3.start()

    c1 = Process(target=customer,args=("王思聪",q))
    c1.start()

上述代码无法正常运行结束,是因为消费者进程中不清楚处理是否处理完成,所以一直在循环等待数据。

此时我们就可以使用joinablequeue队列来让主进程获取生成者进程是否生成完毕的信号从而结束子进程

from multiprocessing import Process,JoinableQueue,Queue




# q = JoinableQueue()
#
# q.put(1)
# q.put(1)
#
# q.get()
# q.task_done()
#
#
# q.join() #阻塞 等待队列中所有数据都被处理完毕
# print("over")

import  time,random
def producter(name,q):
    for i in range(5):
        time.sleep(random.randint(1,3))
        print("\033[46m%s生产了 热狗%s\033[0m" % (name,i))
        q.put("%s的 热狗%s" % (name,i))


def customer(name,q):
    while True:
        time.sleep(random.randint(1, 2))
        hot_dog = q.get()
        print("\033[47m%s 吃掉了 %s \033[0m" % (name,hot_dog))
        # 一个数据处理完毕
        q.task_done()

if __name__ == '__main__':

    # q = Queue()
    q = JoinableQueue()

    p1 = Process(target=producter,args=("北京*店",q))
    p2 = Process(target=producter,args=("上海*店",q))
    p3 = Process(target=producter, args=("深圳*店", q))
    p1.start()
    p2.start()
    p3.start()

    c1 = Process(target=customer,args=("王思聪",q))
    c1.daemon = True # 使子进程跟随主进程结束
    c1.start()

    # 等待生产者进程全部生成完毕
    p1.join()
    p2.join()
    p3.join()

    # 等待所有数据全部处理完毕
    q.join()

    # 终止子进程 也可以开启子进程前将子进程设置为守护进程来结束子进程
    # c1.terminate()

进程池:

​ 进程池与线程池使用方法完全一致,放到线程池一起讲