并发编程:多进程
进程创建的两种方式
#第一种
from multiprocessing import Process
import time
def func(name):(
print(f'{name}子进程开始')
time.sleep(1)
print(f'{name}子进程结束')
if __name__=='__main__':
times=time.time()
x = Process(target=func,args('宋',)) #里面一定要放元组
x.start() #启动进程,并调用该子进程中的x.run()
time.sleep(1) #x.run()调用target制定的函数,启动进程运行的方法
print('主进程启动')
print(f'{time.time()-times}')
#第二种
from multiprocessing import Process
import time
class A(Process): #父类一定是Process
def __init__(self,name):
self.name=name
super().__init__()
def run(self): #一定要有run方法!!!!!!
print(f'{self.name}子进程开始')
time.sleep(1)
print(f'{self.name}子进程结束')
if __name__=='__main__':
times=time.time()
x=A('宋',)
x.start() #只是想操作系统发憷一个开辟子进程的信号,执行下一行,信号接收到了,会在内存总开辟一个进程空间,然后再讲主进程所有数据copy加载到子进程,然后在调用CPU去执行,开辟子进程开销是很大的
time.sleep(1)
print(f'{time.time()-times}主进程开启')
#所以永远会先执行主进程的代码
x.terminate() #强制终止进程x 不会进行任何清理操作
x.is_alive() #判断进程是不是还活着
x.join() #主进程等待执行完子进程以后再执行主进程
x.daemon() #默认值为Flase 代表x为后台运行的守护进程
x.name #查看进程的名称
x.pid #进程的pid
注意:在Windows中process()必须放到if __name__==’__main__’下
获取进程pid
import os
import time
print(f'子进程:{os.getpid()}')
print(f'主进程:{os.getppid()}')
验证进程之间的空间
进程与进程之间是有物理隔离,不能共享内存的数据(lock,队列)
from multiprocessing import Process
import time
import os
name='song'
def task():
global name
name='111'
print(f'{name}')
if __name__=='__main__':
p.Process(target=task)
p.start()
time.sleep(2)
print(f'主开始{name}')
from multiprocessing import Process
import time
import os
lst=[11,22,33]
def task():
global name
name.append(44)
print(f'{lst}')
if __name__=='__main__':
p.Process(target=task)
p.start()
time.sleep(2)
print(f'主开始{lst}')
#可变不可变都会隔离
join
#join就是主进程先让子进程执行结束,再执行主进程
from multiprocessing import Process
import time
def func(name):
print(f'{name} is running')
time.sleep(1)
print(f'{name} is out')
if __name__ == '__main__':
p = Process(target=task,args=('song',)) # 创建一个进程对象
p.start()
p.join()
print('==主开始')
#多个子进程使用join
from multiprocessing import Process
import time
def func(name,s):
print(f'{name}开始')
time.sleep(s)
print(f'{name}结束')
def func1(name,s):
print(f'{name}开始')
time.sleep(s)
print(f'{name}结束')
def func2(name,s):
print(f'{name}开始')
time.sleep(s)
print(f'{name}结束')
if __name__ == '__main__':
times=time.time()
li=[]
for i in range(1,4):
x = Process(target=func,args=('song',i))
x.start()
li.append(x)
#
for i in li:
i.join()
print('主进程开始')
x=Process(target=func,args=('宋',3))
x1=Process(target=func,args=('李',2))
x2=Process(target=func,args=('王',1))
x.start()
x1.start()
x2.start()
#
x.join()
print(f'{time.time()-times}')
x1.join()
print(f'{time.time()-times}')
x2.join()
print(f'主进程开启{time.time()-times}')
.代码优化
from multiprocessing import Process import time def func(name,s): print(f'{name}开始') time.sleep(s) print(f'{name}结束') if __name__ == '__main__': times=time.time() li=[] for i in range(1,4): x = Process(target=func,args=('song',i)) x.start() li.append(x) for i in li: i.join() print('主进程开始')
进程的其他参数
x.terminate() #杀死子进程
x.join() #先执行子进程,后执行主进程
print(x.is_alive()) #判断进程还在不在
# from multiprocessing import Process
# import time
#
# def task(name):
# print(f'{name} is running')
# time.sleep(2)
# print(f'{name} is gone')
#
#
#
# if __name__ == '__main__':
# # 在windows环境下, 开启进程必须在 __name__ == '__main__' 下面
# # p = Process(target=task,args=('常鑫',)) # 创建一个进程对象
# p = Process(target=task,args=('常鑫',),name='alex') # 创建一个进程对象
# p.start()
# # time.sleep(1)
# # p.terminate() # 杀死子进程 ***
# # p.join() # ***
# # time.sleep(0.5)
# # print(p.is_alive()) # ***
# # print(p.name)
# p.name = 'sb'
# print(p.name)
# print('==主开始')
守护进程
#子进程守护主进程,只要结束主进程,子进程也随之结束
x.daemon=True
from multiprocessing import Process
import time
def func(name):
print(f'{name}开始')
time.sleep(1)
print(f'{name}结束')
if __name__ == '__main__':
x.Process(target=func,args('song'))
x.daemon=True #在发送信号之前开启 守护进程,将X子进程设置成守护进程,只要主进程结束,守护进程马上结束
x.start()
x.join()
print('666主进程开始')
僵尸进程与孤儿进程(面试会问)
- 基于unix环境(Linux,macOS)
- 主进程需要等待子进程结束之后,主进程才结束
主进程时刻监测子进程的运行状态,当子进程结束之后,一段时间之内,将子进程进行回收/
为什么主进程不在子进程结束会对其马上回收?
- 主进程与子进程是异步关系,主进程无法马上捕获子进程什么时候结束
- 如果子进程结束之后马上在内存中释放资源,主进程就没办法监测子进程的状态了
Unix针对于上面的问题,提供了一个机制
所有的子进程结束之后,立马回释放掉文件的操作链接,内存的大部分数据,会保留一些内容,进程号,结束时间,运行状态,等待主进程监测,回收
僵尸进程:所有的子进程结束之后,在准备回收之前,都会进入僵尸进程状态
僵尸进程有危害:如果父进程不对僵尸进程进行回收(wait//waitpid),产生大量的僵尸进程,这样就会占用内存,占用进程pid号
孤儿进程:父进程由于某种原因结束了,但是你的子进程还在运行中,这样你的这些子进程就成了孤儿进程,.你的父进程如果结束了.你的所有的孤儿进程就会被inIT进程回收,init就变成了你的父进程,对孤儿进程进行回收
僵尸进程如何解决::::
父进程产生了大量的子进程,但是不回收,这样就会形成大量的僵尸进程,解决方式就是直接杀死父进程将所有的僵尸进程变成孤儿进程然后init进行照顾,由init进程进行回收
互斥锁
三个同事,同时用一个打印机打印内容
模拟三个进程模仿三个同事,输出平台模拟打印机
from multiprocessing import Process
import time,random
def func1(name):
print(f'{name}开始')
time.sleep(random.randint(1, 3))
print(f'{name}结束')
def func2(name):
print(f'{name}开始')
time.sleep(random.randint(1, 3))
print(f'{name}结束')
def func3(name):
print(f'{name}开始')
time.sleep(random.randint(1, 3))
print(f'{name}结束')
if __name__ == '__main__':
x1=Process(target=func1,args=('宋',))
x2=Process(target=func2,args=('佳',))
x3=Process(target=func3,args=('凡',))
x1.start()
x2.start()
x3.start()
#并发的抢占打印机
#并发效率优先,但是不是顺序优先
#多个进程共枪一个资源,需要保证顺序,一个一个来 串行
版本二 加join 变成串行,但是这个顺序是固定的,
版本三
from multiprocessing import Process,Lock
import time,random
def func1(lock,name):
lock.acquire()
print(f'{name}开始')
time.sleep(random.randint(1,3))
print(f'{name}结束')
lock.release()
def func2(lock,name):
lock.acquire()
print(f'{name}开始')
time.sleep(random.randint(1,3))
print(f'{name}结束')
lock.release()
def func3(lock,name):
lock.acquire()
print(f'{name}开始')
time.sleep(random.randint(1,3))
print(f'{name}结束')
lock.release()
if __name__ == '__main__':
lock=Lock()
x1=Process(target=func1,args=(lock,'宋',))
x2=Process(target=func2,args=(lock,'佳',))
x3=Process(target=func3,args=(lock,'凡',))
x1.start()
x2.start()
x3.start()
#保证了顺序,保证了先到先得的原则,串行
join和lock的区别
共同点:都可以阻塞,都可以把并发变成并发,保证了顺序
不同点:join是人为的设定顺序,lock让其争抢顺序,保证了公平性
进程之间的通信:
进程在内存级别是隔离的,但是文件在磁盘上
基于文件通信
抢票系统 #思路分析:先可以查票,查询剩余票数 并发 都可以查 #购买,向服务器发送请求,服务端接受,在后端将票数修改,返回给前端,串行 一个一个的买 from multiprocessing import Process,Lock import os,time,random,json def search(): time.sleep(random.randint(1,3)) with open('song.json','r',encoding='utf-8')as f: dic=json.load(f) print(f'{os.getpid()}查看了票,还剩{dic["count"]}张票') def buy(): with open('song.json','r',encoding='utf-8')as f1: dic=json.load(f1) if dic['count']>0: dic['count']-=1 time.sleep(random.randint(1,3)) with open('song.json','w',encoding='utf-8')as f2: json.dump(dic,f2) print(f'{os.getpid()}购票成功') def run(lock): search() lock.acquire() buy() lock.release() if __name__ == '__main__': lock=Lock() for i in range(6): x= Process(target=run,args=(lock,)) x.start() #当多个进程共枪一个数据时,保证数据安全,必须加锁和串行 #互斥锁 可以公平的保证顺序的问题 以及数据的安全 #基于文件的进程之间的通信 效率低 自己加锁麻烦而且很容易出现死锁
基于队列通信
from multiprocessing import Queue q=Queue(3) #里面可以限制最大有几个 def func(): print(1) q.put(1) #往队列里面放入1 q.put('adfadf') q.put(func) '啥也能放' q.get() q.get() print(q.get()) '一般放几个就取几个' print(q.get(timeout=3)) #阻塞三秒,给他计时,三秒之后还阻塞就主动抛出错误 队列:把队列理解成一个容器,这个容器可以承载一些数据 特性:先进先出 永远保持这个数据,FIFO 羽毛球筒 #如果队列里面是限制三个数据 要是放四个数据 或是取四个数据 会阻塞 #解除阻塞 在别的进程里面取出来 或者添加 print(q.get(block=False)) #只要遇到阻塞就会报错 print(q.get(timeout=2)) q.put(3,block=False) q.put(3,timeout=2) 都报错
基于管道通信
线程
生产者与消费者模型
from multiprocessing import Process,Queue import time,random def producer(q,name): for i in range(1,6): ret=f'第{i}个包子' q.put(ret) print(f'生产者:{name},生产的{ret}') def consumer(q,name): while 1: try: s1=q.get(timeout=2) time.sleep(random.randint(1,3)) print(f'消费者:{name},吃了{s1}') except Exception: return if __name__=='__main__': q=Queue() x=Process(target=producer,args=(q,'宋')) x1=Process(target=consumer,args=(q,'李')) x.start() x1.start()
生产者消费者模型:编程思想,设计理念,理论等等,都是交给你一种编程的方法,以后遇到类似的情况,套用即可
生产者消费者模型三要素:
生产者:产生数据的
消费者:接受数据做进一步处理
容器:队列
队列容易起到的作用:起到缓冲作用,平衡生产力和消费力 ,解耦
线程的理论知识
1.什么是线程
是一条流水线的工作流程
进程:在内存中开启一个进程空间,然后将主进程的所有资源数据复制一份,然后调用CPU去执行这些代码
开启一个进程:在内存中开启一个进程空间,然后将主进程的所有资源数据复制一份,然后调用线程去执行代码
进程是资源单位,线程是执行单位
如何正确描述开启一个进程
开启一个进程:进程会在内存,中开辟一个进程空间,将主进程的资源数据去复制一份,线程会执行里面的代码
进程vs线程:
开启进程的开销非常大,比开启线程的开销大很多
开启线程的速度非常快,要比进程快几十上百倍
from threading import Thread # from multiprocessing import Process # import os # # def work(): # print('hello') # # if __name__ == '__main__': # #在主进程下开启线程 # t=Process(target=work) # t.start() # print('主线程/主进程') # 多线程 # from threading import Thread # import time # # def task(name): # print(f'{name} is running') # time.sleep(1) # print(f'{name} is gone') # # # # if __name__ == '__main__': # # t1 = Thread(target=task,args=('海狗',)) # t1.start() # print('===主线程') # 线程是没有主次之分的.
线程线程之间可以共享数据,进程与进程之间之间需要借助队列等办法实现通信
# from threading import Thread
# import os
#
# x = 3
# def task():
# global x
# x = 100
#
# if __name__ == '__main__':
#
# t1 = Thread(target=task)
# t1.start()
# t1.join()
# print(f'===主线程{x}')
# 同一进程内的资源数据对于这个进程的多个线程来说是共享的.
线程的应用:
- 并发:一个CPU看起来像是同时执行多个任务
- 单个进程开启三个线程,并发的执行任务
- 开启三个进程并发的执行任务
- 文本编辑器:
- 输入而文字
- 子啊屏幕上显示
- 保存在磁盘中
- 开启多线程就非常好了:数据共享,开销小,速度快
- 主线程子线程没有地位之分,但是一个进程谁在干活.:一个主线程在干活,当干完活了你需要等待其他线程干完活之后,才能结束本进程
开启线程的两种方式
#第一种
from threading import Thread
import time
def func():
print('子线程开')
time.sleep(1)
print('子线程关')
if __name__=='__main__':
x=Thread(target=func)
x.start()
print('主线程开')
#一般是子线程先开再开主线程
#第二种
from threading import Thread
import time
class A(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self): #必须要有run这个方法
print('子线程开')
time.sleep(1)
print('子线程关')
x=A('song') #可以不写__name__=='__main__'
x.start()
print('主线程开')
线程其他方法(了解
from threading import Thread,enumerate,activeCount,CurrentThread
import time,os,random
def func():
print(666)
if __name__ == '__main__=':
x=Thread(target=func)
x.start()
print(x.name) #查询name属性 父类中是 Thread-1
x.steName('song') #添加name属性或者修改name属性
print(x.getname()) #查获name属性
print(x.isAlive) #返回线程是否活动的
print(currentThread()) #返回当前的线程变量
print(enumernate()) #返回一个包换正在运功的线程列表list 线程启动后,结束前
print(activecount()) #返回正在运行的线程数量
#主线程等待子线程结束
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
t.join()
print('主线程')
print(t.is_alive())
'''
egon say hello
主线程
False
'''
join与守护线程(考点)
#join 阻塞 告知主线程要等待子线程执行完毕后再执行主线程
from threading import Thread
import time
#
def task(name):
print(f'{name} is running')
time.sleep(1)
print(f'{name} is gone')
#
#
if __name__ == '__main__':
start_time = time.time()
t1 = Thread(target=task,args=('海狗',))
t2 = Thread(target=task,args=('海狗1',))
t3 = Thread(target=task,args=('海狗2',))
#
t1.start()
t1.join()
t2.start()
t2.join()
t3.start()
t3.join()
print(f'===主线程{time.time() - start_time}') # 线程是没有主次之分的.
$$$$守护线程
from multiprocessing import Process
import os,time,random
def func():
print('子进程开')
time.sleep(1)
print('子进程罐')
def foo():
print('1 kai')
time.sleep(1)
print('1 guan')
if __name__=='__main__':
x=Process(target=func)
x1=Process(target=foo)
x.daemon=True
x.start()
x1.start()
time.sleep(1) #注意没有睡眠一秒 就会直接不执行x子进程
print('主进程开')
$$$$守护线程
from threading import Thread
import time
def func(name):
print('zou')
time.sleep(2)
print('gunle')
if __name__ == '__main__':
x=Thread(target=func,args=('宋',))
x1=Thread(target=func,args=('反',))
x1.daemon=True
x.start()
x1.start()
print('主线程开')
#主线程什么时候结束:
非守护进程与主线程结束之后,再结束
互斥锁(线程 考点)
from threading import Thread
import time
import random
x = 10
def task():
time.sleep(random.randint(1,2))
global x
temp = x
time.sleep(random.randint(1, 3))
temp = temp - 1
x = temp
if __name__ == '__main__':
l1 = []
for i in range(10):
t = Thread(target=task)
l1.append(t)
t.start()
for i in l1:
i.join()
print(f'主线程{x}')
#多个任务共抢一个数据,保证数据的安全的目的,要让其串行
from threading import Thread
from threading import Lock
import time
import random
x = 100
def task(lock):
lock.acquire()
# time.sleep(random.randint(1,2))
global x
temp = x
time.sleep(0.01)
temp = temp - 1
x = temp
lock.release()
if __name__ == '__main__':
mutex = Lock()
l1 = []
for i in range(100):
t = Thread(target=task,args=(mutex,))
l1.append(t)
t.start()
time.sleep(3)
print(f'主线程{x}')
死锁现象递归锁(RLock
from threading import Thread,Lock
import time
locka=Lock()
lockb=Lock()
class A(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
locka.acquire()
print(f'{self.name}拿到a')
lockb.acquire()
print(f'{self.name}拿到b')
locka.release()
lockb.release()
def f2(self):
lockb.acquire()
print(f'{self.name}拿到b')
time.sleep(0.1)
locka.acquire()
print(f'{self.name}拿到a')
lockb.release()
locka.release()
if __name__ == '__main__':
for i in range(3):
x = A()
x.start()
解决死锁用递归锁就可以,递归锁有一个计数功能,上锁加1,解锁-1
只要递归锁上面的数字不为零,其他线程就不能抢锁
from threading import Thread,RLock
import time
locka=lockb=RLock()
class A(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
locka.acquire()
print(f'{self.name}拿到a')
lockb.acquire()
print(f'{self.name}拿到b')
locka.release()
lockb.release()
def f2(self):
lockb.acquire()
print(f'{self.name}拿到b')
time.sleep(0.1)
locka.acquire()
print(f'{self.name}拿到a')
lockb.release()
locka.release()
if __name__ == '__main__':
for i in range(3):
x = A()
x.start()
# RLock 递归锁使用方法 locka=loakb=RLock()
$$$递归锁可以解决死锁现象,业务需要多个锁时,要先考虑递归锁
信号量(Semaphore)
也是一种锁,控制并发数量
from threading import Thread,Semaphore,currentThread
import time ,random
s=Semaphore(5) #Semaphore 赋值一个变量 限制分次数,然后用这个带上锁
def func():
s.acquire()
print(f'{currentThread().name}厕所ing')
time.sleep(random.randint(1,3))
s.release()
if __name__ == '__main__':
for i in range(10):
x=Thread(target=func,)
x.start()
GIL全局解释器锁
理论上来说,单个进程的多线程可以利用多核,但是开发cpython解释器的程序员,给进入解释器的线程加了锁
为什么加锁,优缺点
当时都是单核时代,而且CPU价格非常贵,
如果不加全局解释器锁,开发cpython解释器的程序员就会在源码内部各种主动加锁,解锁,非常麻烦,各种死锁现象,为了省事,直接进入解释器时给线程加了一个锁
优点:保证了cpython解释器的数据资源的安全
缺点:单个进程的多线程不能利用多核
jpython以及pypy没有GIL锁
多核时代,将GIL锁去掉可以吗?
因为cpython解释器的所有的业务逻辑都是围绕着单个线程实现的,去掉这个GIL锁,几乎不可能
单个进程的多线程可以并发,但是不能利用多核,不能并行
多个进程可以并行,并发
GIL与Lock锁的区别
相同点:都是同种锁,互斥锁
不同点:GIL全局解释器锁,保护解释器内部的资源数据的安全
GIL锁,上锁 释放 无需手动操作
自己代码中定义的互斥锁保护进程中的资源数据的安全
自己定义的互斥锁必须自己手动上锁,释放锁
io计算密集型
io密集型
#io密集型:单个进程的多线程合适 并发执行
计算密集型
计算密集型:适合多进程的并行
验证io计算密集型
from threading import Thread
from multiprocessing import Process
import time,random
# 计算密集型:单个进程的多线程并发vs进程的并发执行
def func():
count=0
for i in range(100000000):
count+=1
if __name__ == '__main__':
times=time.time()
lst=[]
for i in range(4):
x=Process(target=func)
lst.append(x)
x.start()
for i in lst:
i.join()
print(f'{time.time()-times}') #17.108332633972168
if __name__ == '__main__':
times=time.time()
lst=[]
for i in range(4):
x=Thread(target=func)
lst.append(x)
x.start()
for i in lst:
i.join()
print(f'{time.time()-times}') #30.25704336166382
总结:计算密集型,还是用多进程的并发效率高(前提是数据比较大
#io密集型
# io密集型 单个进程的多线程并发vs多个进程的并发并行
from threading import Thread
from multiprocessing import Process
import time,random
def func():
count=0
time.sleep(random.randint(1,3))
count+=1
if __name__ == '__main__':
times=time.time()
li=[]
for i in range(4):
x=Process(target=func)
li.append(x)
x.start()
for i in li:
i.join()
print(f'{time.time()-times}') #多进程3.244175910949707
if __name__ == '__main__':
times=time.time()
li=[]
for i in range(4):
x=Thread(target=func)
li.append(x)
x.start()
for i in li:
i.join()
print(f'{time.time()-times}') #多线程2.002326250076294
计算io密集型的还是用多线程的并发合适效率高
多线程实现socket通信
无论是多线程还是多进程,如果按照上面的写法,来一个
客户端请求,我就开一个线程,来一个请求开一个线程,
应该是这样: 你的计算机允许范围内,开启的线程进程数
量越多越好
#服务端
import socket,time
from threading import Thread
def foo(conn,addr):
while 1:
try:
s=conn.recv(1024).decode('utf-8')
print(f'{addr[1]}{s}')
time.sleep(1)
s1=input('<<<<').strip().encode('utf-8')
conn.send(s1)
except Exception:
break
conn.close()
def func():
server = socket.socket()
server.bind(('127.0.0.1', 8848))
server.listen(5)
while 1:
conn, addr = server.accept()
x=Thread(target=foo,args=(conn,addr))
x.start()
if __name__ == '__main__':
func()
#客户端
import socket
client=socket.socket()
client.connect(('127.0.0.1',8890))
while 1:
s=input('<<<<').strip().encode('utf-8')
client.send(s)
s1=client.recv(1024).decode('utf-8')
print(f'{s1}')
client.close()
进程池,线程池(ProcessPoolExecutor,ThreadPoolExecutor)
线程池:一个容器,这个容器限制住你开启线程的数量,比如四个,第一次肯定只能并发的处理四个任务,只要有任务完成,线程马上就会接下一个任务.以时间换空间
#默认进程池,是你的计算机cpu核数
#默认线程池 是你的计算机CPU核数 *5 乘5
#查看cpu核数 import os
# print(os.cpu_count())
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
print(os.cpu_count()) #查看计算机核数
def func():
print(f'{os.getpid()}接客')
time.sleep(random.randint(1,2))
if __name__ == '__main__': #进程池(并行+并发
p=ProcessPoolExecutor() # 默认不写,进程池里面的进程数与cpu个数相等
for i in range(5):
p.submit(func,)
if __name__ == '__main__': #线程池(并发
p=ThreadPoolExecutor(3) # 默认不写, cpu个数*5 线程数
p.submit(func)
p.submit(func)
p.submit(func)
p.submit(func)
p.submit(func)
p.submit(func)