python — 协程

1. 协程

1.1 协程基础

1.协程 :能够在一个线程下的多个任务之间来回切换,那么每一个任务都是一个协程。

2.协程的优点:

  • 1.一个线程中的阻塞都被其他的各种任务沾满了
  • 2.让操作系统觉得这个线程很忙,尽量的减少这个线程进入阻塞的状态,提高了单线程对CPU的利用率。
  • 3.多个任务在同一个线程中执行,也达到了一个并发的效果,规避了每一个任务的io操作,减少了线程的个数,减轻了操作系统的负担。

3.协程是用户级别的,由我们自己写的python代码来控制切换的,是操作系统不可见的。
4.在Cpython解释器下:

  • 由于多线程本身就不能利用多核,所以即便是开启了多个线程也只能轮流在一个CPU上执行,协程如果把所有任务的IO操作都规避掉,只剩下需要使用CPU的操作,就意味着协程就可以做到提高CPU利用率的效果。
  • 协程和线程都不能利用多核,都是在一个CPU上轮流执行

5.多线程和协程的区别:

  • 线程:切换需要操作系统,开销大,操作系统不可控,给操作系统的压力大,操作系统对IO操作的感知更加灵敏。
  • 协程:切换需要python代码,开销小,用户操作可控,完全不会增加操作系统的压力,用户级别能够对IO操作的感知比较低。

1.2 切换

两种切换方式:

  • 原生python完成 — asyncio(基于yield)
  • C语言完成的python模块 —— gevent 或 greenlet
1.2.1 gevent模块 / greenlet模块

1.greenlet模块

import time
from  greenlet import greenlet

def eat():
    print('wusir is eating')
    time.sleep(0.5)
    g2.switch()
    print('wusir finished eat')

def sleep():
    print('小马哥 is sleeping')
    time.sleep(0.5)
    print('小马哥 finished sleep')
    g1.switch()

g1 = greenlet(eat)  # (实例化一个对象)创造一个协程任务
g2 = greenlet(sleep)
g1.switch()   # 做切换

2.gevent模块

gevent是基于greenlet切换的

<1.> 没有返回值

import time
import gevent
def eat():
    print('wusir is eating')
    time.sleep(1)
    print('wusir finished eat')
def sleep():
    print('小马哥 is sleeping')
    time.sleep(1)
    print('小马哥 finished sleep')

g1 = gevent.spawn(eat)  # 创造一个协程任务
gevent.sleep(1)  # 阻塞,切换出去去执行任务

注意:

  • 在gevent中,gevent.sleep(1)与time.sleep(1)执行的效果不一样,

    gevent.sleep(1) — 切换、切出去,time.sleep(1) — 就是普通的睡,没有切换的效果。

  • 如果想让gevent认识time.sleep() — 切出去,需要导入:

    from gevent import monkey
    monkey.patch_all()

    注意:

    • 此时的time已经不是python中内置函数time了,已经被 patch_all 重写了。(效果见下面示例)

分辨gevent是否识别了我们写的代码中的io操作的方法:

  • 在patchall之前打印一下涉及到io操作的函数地址
  • 在patchall之后打印一下涉及到io操作的函数地址
  • 如果两个地址一致,说明gevent没有识别这个io,如果不一致说明识别了
import time
print('-->',time.sleep)
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
    print('wusir is eating')
    print('in eat: ',time.sleep)
    time.sleep(1)
    print('wusir finished eat')

def sleep():
    print('小马哥 is sleeping')
    time.sleep(1)
    print('小马哥 finished sleep')

g1 = gevent.spawn(eat)  # 创造一个协程任务(发布任务)
g2 = gevent.spawn(sleep)
g1.join()   # 阻塞 直到g1任务完成为止(没有join,主程序不会做切换)
g2.join()
import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
    print('wusir is eating')
    time.sleep(1)
    print('wusir finished eat')

def sleep():
    print('小马哥 is sleeping')
    time.sleep(1)
    print('小马哥 finished sleep')
    
# 方式一:
# g1 = gevent.spawn(eat)
# g2 = gevent.spawn(sleep)
# # g1.join()
# # g2.join()

# gevent.joinall([g1,g2])  # gevent.joinall([g1,g2]) 相当于 g1.join() + g2.join()

# 方式二:
g_l = []
for i in range(10):
    g = gevent.spawn(eat)
    g_l.append(g)
gevent.joinall(g_l)

<2.> 有返回值

对象.value 接收返回值

value 是一个属性,没有阻塞功能,需要使用join / joinall 阻塞切换出去

import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
    print('wusir is eating')
    time.sleep(1)
    print('wusir finished eat')
    return 'wusir***'

def sleep():
    print('小马哥 is sleeping')
    time.sleep(1)
    print('小马哥 finished sleep')
    return '小马哥666'

g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
gevent.joinall([g1,g2])
print(g1.value)
print(g2.value)

<3.> 用于socket

用于socket的 server端 时,遇到 io 操作就切换。

传参:gevent.spawn(函数名,参数1,参数2,……)

# server端
import gevent
from gevent import monkey
monkey.patch_all()
import socket

def chat(conn):
    while True:
        msg = conn.recv(1024).decode('utf-8')
        conn.send(msg.upper().encode('utf-8'))

sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()

while True:
    conn,_ = sk.accept()
    gevent.spawn(chat,conn)

# client端    
import time
import socket
def client(i):
    sk = socket.socket()
    sk.connect(('127.0.0.1',9000))

    while True:
        sk.send('hello'.encode('utf-8'))
        print(i*'*',sk.recv(1024))
        time.sleep(0.5)

from threading import Thread
for i in range(500):
    Thread(target=client,args =(i,)).start()
1.2.2 asyncio 模块

asyncio 模块是基于yield机制切换的

await asyncio.sleep() 异步 阻塞(做切换)

await 所在的函数前面必须加 async,变成一个async函数。

python原生的底层的协程模块:

  • 爬虫、webserver框架
  • 提高网络编程的效率和并发效果

语法:

  • 1.await — async wait: 阻塞 协程函数这里要切换出去,还能保证一会儿再切回来

    await 必须写在async函数里,async函数是协程函数

  • loop 事件循环

    所有的协程的执行 调度 都离不开这个loop

1.起一个任务

import asyncio
async def demo():   # 协程方法(表示是一个async函数)
    print('start')
    await asyncio.sleep(1)  # 阻塞
    print('end')

loop = asyncio.get_event_loop()  # 创建一个事件循环
loop.run_until_complete(demo())  # 把demo任务丢到事件循环中去执行

2.启动多个任务,并且没有返回值

import asyncio
async def demo():   # 协程方法
    print('start')
    await asyncio.sleep(1)  # 阻塞
    print('end')

loop = asyncio.get_event_loop()  # 创建一个事件循环
wait_obj = asyncio.wait([demo(),demo(),demo()])
loop.run_until_complete(wait_obj)

3.启动多个任务并且有返回值

import asyncio
async def demo():   # 协程方法
    print('start')
    await asyncio.sleep(1)  # 阻塞
    print('end')
    return 123

loop = asyncio.get_event_loop()
t1 = loop.create_task(demo())
t2 = loop.create_task(demo())
tasks = [t1,t2]
wait_obj = asyncio.wait([t1,t2])
loop.run_until_complete(wait_obj)
for t in tasks:
    print(t.result())

4.谁先回来先取谁的结果

import asyncio
async def demo(i):   # 协程方法
    print('start')
    await asyncio.sleep(10-i)  # 阻塞
    print('end')
    return i,123

async def main():
    task_l = []
    for i in range(10):
        task = asyncio.ensure_future(demo(i))
        task_l.append(task)
    for ret in asyncio.as_completed(task_l):
        res = await ret
        print(res)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

5.用asyncio做一个底层的爬虫示例

import asyncio

async def get_url():
    reader,writer = await asyncio.open_connection('www.baidu.com',80)
    writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n')
    all_lines = []
    async for line in reader:
        data = line.decode()
        all_lines.append(data)
    html = '\n'.join(all_lines)
    return html

async def main():
    tasks = []
    for url in range(20):
        tasks.append(asyncio.ensure_future(get_url()))
    for res in asyncio.as_completed(tasks):
        result = await res
        print(result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())  # 处理一个任务

1.3 总结

1.进程、线程、协程各自的特点:

  • 1.进程:开销大,数据隔离,能利用多核,数据不安全,操作系统控制
  • 2.线程:开销适中,数据共享,在cpython解释器下不能利用多核,数据不安全,操作系统控制
  • 3.协程:开销小,数据共享,不能利用多核,数据安全,用户控制

2.ansyncio模块为我们提供了哪两个关键字?分别用什么作用?

  • async 标识一个协程函数
  • await 后面跟着一个asyncio模块提供的io操作的函数
  • loop 事件循环,负责在多个任务之间进行切换的

3.关于读代码相关的问题:

  • 1.开启线程是几乎不需要事件的,start是一个异步非阻塞方法
  • 2.对于整数的 += 、-= 来说 异步的多线程数据不安全,如果是同步的数据就安全了
  • 3.对于列表的操作:无论是异步还是同步的 都是数据安全的

4.有一个文件,这个文件中有20001行数据,开启一个线程池,为每100行创建一个任务,打印这100行数据。

# 方法一:
def print_line(lines):
    print(lines)

from concurrent.futures import ThreadPoolExecutor
tp = ThreadPoolExecutor(20)
with open('file',encoding='utf-8') as f:
    lines = []
    for line in f:
        if len(lines) == 100:
            tp.submit(print_line,lines)
            lines.clear()
        lines.append(line)
    if lines:
        tp.submit(print_line, lines)
# 方法二:
def print_line(lines):
    print(lines)

def read_file(filename):
    with open(filename, encoding='utf-8') as f:
        for line in f:
            yield line

def submit_func(tp,line=None,end= False,lines = []):
    if line:
        lines.append(line)
    if len(lines) == 100 or end:
        tp.submit(print_line, lines)
        lines.clear()

from concurrent.futures import ThreadPoolExecutor
tp = ThreadPoolExecutor(20)
for line in read_file('file'):
    submit_func(tp,line)
submit_func(tp,end=True)
  1. print 和 文件的读、写都是io操作:

    上面这个题主要就是起线程在一个线程读文件的时候,利用这个线程读取文件的时间交给另一个线程来进行打印操作

  2. 使用线程可以有效的规避掉io操作的时间,提高程序的的效率

  3. 解耦程序的功能

  4. 默认参数是列表

点赞