1. 协程
1.1 协程基础
1.协程 :能够在一个线程下的多个任务之间来回切换,那么每一个任务都是一个协程。
2.协程的优点:
- 1.一个线程中的阻塞都被其他的各种任务沾满了
- 2.让操作系统觉得这个线程很忙,尽量的减少这个线程进入阻塞的状态,提高了单线程对CPU的利用率。
- 3.多个任务在同一个线程中执行,也达到了一个并发的效果,规避了每一个任务的io操作,减少了线程的个数,减轻了操作系统的负担。
3.协程是用户级别的,由我们自己写的python代码来控制切换的,是操作系统不可见的。
4.在Cpython解释器下:
- 由于多线程本身就不能利用多核,所以即便是开启了多个线程也只能轮流在一个CPU上执行,协程如果把所有任务的IO操作都规避掉,只剩下需要使用CPU的操作,就意味着协程就可以做到提高CPU利用率的效果。
- 协程和线程都不能利用多核,都是在一个CPU上轮流执行
5.多线程和协程的区别:
- 线程:切换需要操作系统,开销大,操作系统不可控,给操作系统的压力大,操作系统对IO操作的感知更加灵敏。
- 协程:切换需要python代码,开销小,用户操作可控,完全不会增加操作系统的压力,用户级别能够对IO操作的感知比较低。
1.2 切换
两种切换方式:
- 原生python完成 — asyncio(基于yield)
- C语言完成的python模块 —— gevent 或 greenlet
1.2.1 gevent模块 / greenlet模块
1.greenlet模块
import time
from greenlet import greenlet
def eat():
print('wusir is eating')
time.sleep(0.5)
g2.switch()
print('wusir finished eat')
def sleep():
print('小马哥 is sleeping')
time.sleep(0.5)
print('小马哥 finished sleep')
g1.switch()
g1 = greenlet(eat) # (实例化一个对象)创造一个协程任务
g2 = greenlet(sleep)
g1.switch() # 做切换
2.gevent模块
gevent是基于greenlet切换的
<1.> 没有返回值
import time
import gevent
def eat():
print('wusir is eating')
time.sleep(1)
print('wusir finished eat')
def sleep():
print('小马哥 is sleeping')
time.sleep(1)
print('小马哥 finished sleep')
g1 = gevent.spawn(eat) # 创造一个协程任务
gevent.sleep(1) # 阻塞,切换出去去执行任务
注意:
在gevent中,gevent.sleep(1)与time.sleep(1)执行的效果不一样,
gevent.sleep(1) — 切换、切出去,time.sleep(1) — 就是普通的睡,没有切换的效果。
如果想让gevent认识time.sleep() — 切出去,需要导入:
from gevent import monkey
monkey.patch_all()注意:
- 此时的time已经不是python中内置函数time了,已经被 patch_all 重写了。(效果见下面示例)
分辨gevent是否识别了我们写的代码中的io操作的方法:
- 在patchall之前打印一下涉及到io操作的函数地址
- 在patchall之后打印一下涉及到io操作的函数地址
- 如果两个地址一致,说明gevent没有识别这个io,如果不一致说明识别了
import time
print('-->',time.sleep)
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
print('wusir is eating')
print('in eat: ',time.sleep)
time.sleep(1)
print('wusir finished eat')
def sleep():
print('小马哥 is sleeping')
time.sleep(1)
print('小马哥 finished sleep')
g1 = gevent.spawn(eat) # 创造一个协程任务(发布任务)
g2 = gevent.spawn(sleep)
g1.join() # 阻塞 直到g1任务完成为止(没有join,主程序不会做切换)
g2.join()
import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
print('wusir is eating')
time.sleep(1)
print('wusir finished eat')
def sleep():
print('小马哥 is sleeping')
time.sleep(1)
print('小马哥 finished sleep')
# 方式一:
# g1 = gevent.spawn(eat)
# g2 = gevent.spawn(sleep)
# # g1.join()
# # g2.join()
# gevent.joinall([g1,g2]) # gevent.joinall([g1,g2]) 相当于 g1.join() + g2.join()
# 方式二:
g_l = []
for i in range(10):
g = gevent.spawn(eat)
g_l.append(g)
gevent.joinall(g_l)
<2.> 有返回值
对象.value 接收返回值
value 是一个属性,没有阻塞功能,需要使用join / joinall 阻塞切换出去
import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
print('wusir is eating')
time.sleep(1)
print('wusir finished eat')
return 'wusir***'
def sleep():
print('小马哥 is sleeping')
time.sleep(1)
print('小马哥 finished sleep')
return '小马哥666'
g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
gevent.joinall([g1,g2])
print(g1.value)
print(g2.value)
<3.> 用于socket
用于socket的 server端 时,遇到 io 操作就切换。
传参:gevent.spawn(函数名,参数1,参数2,……)
# server端
import gevent
from gevent import monkey
monkey.patch_all()
import socket
def chat(conn):
while True:
msg = conn.recv(1024).decode('utf-8')
conn.send(msg.upper().encode('utf-8'))
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()
while True:
conn,_ = sk.accept()
gevent.spawn(chat,conn)
# client端
import time
import socket
def client(i):
sk = socket.socket()
sk.connect(('127.0.0.1',9000))
while True:
sk.send('hello'.encode('utf-8'))
print(i*'*',sk.recv(1024))
time.sleep(0.5)
from threading import Thread
for i in range(500):
Thread(target=client,args =(i,)).start()
1.2.2 asyncio 模块
asyncio 模块是基于yield机制切换的
await asyncio.sleep() 异步 阻塞(做切换)
await 所在的函数前面必须加 async,变成一个async函数。
python原生的底层的协程模块:
- 爬虫、webserver框架
- 提高网络编程的效率和并发效果
语法:
1.await — async wait: 阻塞 协程函数这里要切换出去,还能保证一会儿再切回来
await 必须写在async函数里,async函数是协程函数
loop 事件循环
所有的协程的执行 调度 都离不开这个loop
1.起一个任务
import asyncio
async def demo(): # 协程方法(表示是一个async函数)
print('start')
await asyncio.sleep(1) # 阻塞
print('end')
loop = asyncio.get_event_loop() # 创建一个事件循环
loop.run_until_complete(demo()) # 把demo任务丢到事件循环中去执行
2.启动多个任务,并且没有返回值
import asyncio
async def demo(): # 协程方法
print('start')
await asyncio.sleep(1) # 阻塞
print('end')
loop = asyncio.get_event_loop() # 创建一个事件循环
wait_obj = asyncio.wait([demo(),demo(),demo()])
loop.run_until_complete(wait_obj)
3.启动多个任务并且有返回值
import asyncio
async def demo(): # 协程方法
print('start')
await asyncio.sleep(1) # 阻塞
print('end')
return 123
loop = asyncio.get_event_loop()
t1 = loop.create_task(demo())
t2 = loop.create_task(demo())
tasks = [t1,t2]
wait_obj = asyncio.wait([t1,t2])
loop.run_until_complete(wait_obj)
for t in tasks:
print(t.result())
4.谁先回来先取谁的结果
import asyncio
async def demo(i): # 协程方法
print('start')
await asyncio.sleep(10-i) # 阻塞
print('end')
return i,123
async def main():
task_l = []
for i in range(10):
task = asyncio.ensure_future(demo(i))
task_l.append(task)
for ret in asyncio.as_completed(task_l):
res = await ret
print(res)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
5.用asyncio做一个底层的爬虫示例
import asyncio
async def get_url():
reader,writer = await asyncio.open_connection('www.baidu.com',80)
writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n')
all_lines = []
async for line in reader:
data = line.decode()
all_lines.append(data)
html = '\n'.join(all_lines)
return html
async def main():
tasks = []
for url in range(20):
tasks.append(asyncio.ensure_future(get_url()))
for res in asyncio.as_completed(tasks):
result = await res
print(result)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main()) # 处理一个任务
1.3 总结
1.进程、线程、协程各自的特点:
- 1.进程:开销大,数据隔离,能利用多核,数据不安全,操作系统控制
- 2.线程:开销适中,数据共享,在cpython解释器下不能利用多核,数据不安全,操作系统控制
- 3.协程:开销小,数据共享,不能利用多核,数据安全,用户控制
2.ansyncio模块为我们提供了哪两个关键字?分别用什么作用?
- async 标识一个协程函数
- await 后面跟着一个asyncio模块提供的io操作的函数
- loop 事件循环,负责在多个任务之间进行切换的
3.关于读代码相关的问题:
- 1.开启线程是几乎不需要事件的,start是一个异步非阻塞方法
- 2.对于整数的 += 、-= 来说 异步的多线程数据不安全,如果是同步的数据就安全了
- 3.对于列表的操作:无论是异步还是同步的 都是数据安全的
4.有一个文件,这个文件中有20001行数据,开启一个线程池,为每100行创建一个任务,打印这100行数据。
# 方法一:
def print_line(lines):
print(lines)
from concurrent.futures import ThreadPoolExecutor
tp = ThreadPoolExecutor(20)
with open('file',encoding='utf-8') as f:
lines = []
for line in f:
if len(lines) == 100:
tp.submit(print_line,lines)
lines.clear()
lines.append(line)
if lines:
tp.submit(print_line, lines)
# 方法二:
def print_line(lines):
print(lines)
def read_file(filename):
with open(filename, encoding='utf-8') as f:
for line in f:
yield line
def submit_func(tp,line=None,end= False,lines = []):
if line:
lines.append(line)
if len(lines) == 100 or end:
tp.submit(print_line, lines)
lines.clear()
from concurrent.futures import ThreadPoolExecutor
tp = ThreadPoolExecutor(20)
for line in read_file('file'):
submit_func(tp,line)
submit_func(tp,end=True)
print 和 文件的读、写都是io操作:
上面这个题主要就是起线程在一个线程读文件的时候,利用这个线程读取文件的时间交给另一个线程来进行打印操作
使用线程可以有效的规避掉io操作的时间,提高程序的的效率
解耦程序的功能
默认参数是列表