Python并发编程

1. 进程、线程、协程

进程是cpu资源分配的最小单位,进程是正在进行的一个过程或者说一个任务。
线程是cpu调度的最小单位。
协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。(单线程下的并发)
进程线程区别:

  • 线程开销小
  • 主进程下多个进程PID不同;主进程下多个线程PID与主进程相同
  • 进程间地址空间相隔离;同一个进程内多个线程共享该进程的地址资源。

2. 僵尸进程

子进程结束后,子进程的状态信息仍然保存在系统中,直到父进程结束。(所有进程都会经历僵尸进程)

3. 孤儿进程

父进程已经退出,但它的一个或多个子进程还在运行, 这些进程就会成为孤儿进程,由init进程收养。

4. 守护进程

守护进程会在主进程代码执行结束后就终止。

5. 创建子进程

方式一:

import time
from multiprocessing import Process


def task(name):  # 子进程执行的任务
    print('%s is running' % name)
    time.sleep(3)
    print('%s is done' % name)


if __name__ == '__main__':

    # 实例化得到对象
    p1 = Process(target=task, args=('子进程1',))  # 必须加逗号
    p2 = Process(target=task, kwargs={'name': '子进程2'})

    # 调用对象下的方法,开启进程
    p1.start()  # 仅是给操作系统发送信号, 由操作系统开启子进程
    p2.start()

    print('主')

方式二:

import time
from multiprocessing import Process

class MyProcess(Process):

    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):  # 实际上是start,但是必须叫run
        print('%s is running' % self.name)
        time.sleep(3)
        print('%s is done' % self.name)


if __name__ == '__main__':

    p1 = MyProcess('子进程1')
    p2 = MyProcess('子进程2')

    p1.start()  # 启动进程,并调用该子进程中的p.run() 
    p2.start()

    print('主')

6. 查看进程的相关参数

进程名:print(p.name)
进程PID: print(os.getpid) print(p.pid)
父进程PID:print(os.getppid)

7. 相关函数

等待进程运行结束:p.join()
查看进程是否在运行:print(p.is_alive())
终止进程:p.terminate() 进程结束时间由操作系统决定。
终止进程但不进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程;如果p还保存了一个锁那么也将不会被释放,进而导致死锁。
开启守护进程:p.daemon = True 必须在p.start()之前设置,守护进程将被禁止创建子进程。

8. 互斥锁

例子:

from multiprocessing import Process, Lock
import time


def task(name, lock):
    lock.acquire()
    print('%s 1' % name)
    time.sleep(1)
    print('%s 2' % name)
    time.sleep(1)
    print('%s 3' % name)
    lock.release()


if __name__ == '__main__':

    lock = Lock()
    for i in range(3):
        p = Process(target=task, args=('进程%s' % i, lock))
        p.start()

9. 队列 Queue

例子:

from multiprocessing import Queue

que = Queue(3)

que.put(1)
que.put({'a': 'hello'})
que.put([1, 2, 3])
print(que.full())  # 判断队列是否已满
# que.put(4) #再放就阻塞住了

print(que.get())
print(que.get())
print(que.get())
print(que.empty())  # 队列是否空了
# print(que.get())  # 再取就阻塞住了

10. 队列 JoinableQueue

例子:

from multiprocessing import Process, JoinableQueue
import time
import random


def producer(q, name, food):
    for i in range(2):
        res = '%s%s' % (food, i)
        time.sleep(random.randint(1, 3))
        print('\033[34m%s 生产了 %s\033[0m' % (name, res))
        q.put(res)  # 入队
    q.join()  # 等到消费者把队列中的所有的数据都取走之后,生产者才结束


def consumer(q, name):
    while True:
        res = q.get()  # 出队
        time.sleep(random.randint(1, 3))
        print('\033[32m%s 吃 %s\033[0m' % (name, res))
        q.task_done()  # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了


if __name__ == '__main__':

    que = JoinableQueue()

    # 3个生产者
    p1 = Process(target=producer, args=(que, '生产者1', '包子'))
    p2 = Process(target=producer, args=(que, '生产者2', '饺子'))
    p3 = Process(target=producer, args=(que, '生产者3', '月饼'))

    # 2个消费者
    c1 = Process(target=consumer, args=(que, '消费者1'))
    c2 = Process(target=consumer, args=(que, '消费者2'))
    c1.daemon = True  # 将消费者设为守护进程
    c2.daemon = True  # 消费者在生产者结束后, 随主进程一起结束

    l1 = [p1, p2, p3]
    l2 = [c1, c2]

    # 开始生产
    for p in l1:
        p.start()

    # 开始消费
    for c in l2:
        c.start()

    # 主进程等生产者p1、p2、p3结束
    # 而p1、p2、p3在消费者把所有数据都取干净之后结束
    for p in l1:
        p.join()
    print('主')

11. 协程

使用greenlet库:

from greenlet import greenlet

def eat(name):
    print('%s eat 1' % name)
    g2.switch('egon')  # 传入第一次参数, 之后不用再传
    print('%s eat 2' % name)
    g2.switch()
    
    
def play(name):
    print('%s play 1' % name)
    g1.switch()
    print('%s play 2' % name)


g1 = greenlet(eat)
g2 = greenlet(play)

g1.switch('egon')  # 在第一次switch时传入参数,以后都不需要

使用gevent(无法识别socket time等模块的阻塞,需要使用gevent自带的阻塞)

import gevent


def eat(name):
    print('%s eat 1' % name)
    gevent.sleep(2)  # gevent识别到阻塞,进行切换
    print('%s eat 2' % name)


def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1 = gevent.spawn(eat, 'egon')
g2 = gevent.spawn(play, name='egon')
g1.join()
g2.join()
# gevent.joinall([g1,g2])  # 等待列表中所有协程对象结束
print('主')

导入monkey 可识别socket time等阻塞:

from gevent import monkey; monkey.patch_all()
# patch_all()必须放在导入socket、time等模块前,否则gevent无法识别socket、time的阻塞
import gevent
import time


def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')


def play():
    print('play 1')
    time.sleep(1)
    print('play 2')


g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1, g2])
print('主')

12. 进程池、线程池

导入模块:

from concurrent.futures import ProcessPoolExecutor  # 进程池
from concurrent.futures import ThreadPoolExecutor  # 线程池

创建进程/线程池:

executor = ProcessPoolExecutor(max_worker=3)  # 进程池
executor = ThreadPoolExecutor(max_workers=3)  #线程池

将进程/线程放入池内:

future = exector.submit(task, parm)

"""
executor.map(task, range(1,12))
相当于:
for i in range(11):
    exector.submit(task, i)
"""

关闭进程/线程池:

exector.shutdown()  # 默认wait参数为True

wait=True 等待池内所有任务执行完毕回收完资源后再执行后续代码。
wait=False 不join,直接执行后续代码。
关闭进程/线程池后,不允许再向已关闭的进程/线程池内加入进程/线程。

拿到进程/线程运行结果:

print(future.result())

回调函数:
future.add_done_callback(func) futuretask结束后,会自动把future对象当做参数传给回调函数func。

例子:

from concurrent.futures import ProcessPoolExecutor
import time
import random


def task(name):
    print("%s is running" % name)
    time.sleep(random.randint(3, 5))
    return name


def func(future):
    name = future.result()
    print("%s's callback function" % name)


if __name__ == '__main__':
    executor = ProcessPoolExecutor(5)
    futures = []

    for i in range(3):
        future = executor.submit(task, "task"+str(i))
        future.add_done_callback(func)
        futures.append(future)

    executor.shutdown(True)

    print("主") 

13. IO模型

  • IO涉及的两个对象:进程/线程内核
  • IO操作的两个阶段:
    1. 等待数据准备
    2. 将数据从内存拷贝到进程中
  • 5种IO模型:
    • 阻塞IO(blocking IO): 在内核将数据准备好之前,系统调用会一直等待。
    • 非阻塞IO(nonblocking IO): 每次客户询问内核是否有数据准备好,当有数据报准备好时,就进行拷贝数据报的操作,当没有数据报准备好时,也不阻塞程序,内核直接返回未准备就绪的信号,等待用户程序的下一个轮寻。
    • 多路复用IO(IO multiplexing): 多路复用IO属于阻塞IO,但可以对多个文件描述符进行阻塞监听,所以效率较阻塞IO的高。
    • 信号驱动IO(signal driven IO): 在信号驱动IO模型中,当用户线程发起一个IO请求操作,会给对应的socket注册一个信号函数,然后用户线程会继续执行,当内核数据就绪时会发送一个信号给用户线程,用户线程接收到信号之后,便在信号函数中调用IO读写操作来进行实际的IO请求操作。
    • 异步IO(asynchronous IO): 数据准备、将数据从内从拷贝到进程均由内核完成,完成后告知进程该IO操作已完成。期间程序不阻塞。

阻塞程度:阻塞IO>非阻塞IO>多路转接IO>信号驱动IO>异步IO,效率是由低到高。

14. 信号量

信号量也是一把锁,但同一时间可以被指定大小的任务获取。

模块导入:

from threading import Semaphore

设置计数器大小:

sm = Semaphore(value=1)  # 计数器大小默认为1

两个主要的方法:

acquire()  # 内置计数器-1, 当计数器为0时阻塞,等待其他线程调用release()
release()  # 内置计数器+1

例子:

from threading import Thread, Semaphore
import threading
import time


def func():
    sm.acquire()
    print('%s get sm' % threading.current_thread().getName())
    time.sleep(3)
    sm.release()


if __name__ == '__main__':
    sm = Semaphore(5)
    for _ in range(23):
        t = Thread(target=func)
        t.start()

15. Event

Event对象:
用于线程间通信,即程序中的其一个线程需要通过判断某个线程的状态来确定自己下一步的操作,就用到了Event对象。

模块导入:

from threading import Event

创建Event对象:

event = Event()

相关方法:

event.isSet()  # 返回event的状态值
event.wait([maxtime])  # 如果 event.isSet() == False将阻塞线程, [maxtime]-超时时间
event.set()  # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度
event.clear()  # 恢复event的状态值为False

例子:

from threading import Thread, Event
import threading
import time
import random


def conn_mysql():
    count = 1
    while not event.is_set():
        if count > 3:
            print('\033[31m[%s]连接失败\033[0m' % threading.current_thread().getName())
            exit(0)
        print('<%s>第%s次尝试连接' % (threading.current_thread().getName(), count))
        event.wait(1)
        count += 1
    print('<%s>连接成功' % threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(1, 4))
    event.set()


if __name__ == '__main__':
    event = Event()
    conn1 = Thread(target=conn_mysql)
    conn2 = Thread(target=conn_mysql)
    check = Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

16. 定时器

定时器Timer:指定n秒后执行某项操作(不会像sleep一样阻塞)
例子:

from threading import Timer


def hello():
    print("hello, world")


t = Timer(1, hello)  // 一秒后执行hello函数
t.start()

17. 递归锁

死锁现象: 指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,这种永远在互相等待的进程称为死锁进程。
递归锁: RLock可以连续acquire多次。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

例子:

from threading import Thread, RLock
import time

mutexA = mutexB = RLock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' % self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' % self.name)

        mutexB.release()
        print('\033[41m%s 释放A锁\033[0m' % self.name)

        mutexA.release()
        print('\033[42m%s 释放B锁\033[0m' % self.name)

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' % self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A锁\033[0m' % self.name)

        mutexA.release()
        print('\033[43m%s 释放B锁\033[0m' % self.name)

        mutexB.release()
        print('\033[44m%s 释放A锁\033[0m' % self.name)


if __name__ == '__main__':
    for _ in range(4):
        t = MyThread()
        t.start()

18. GIL全局解释器锁

cpython中引进GIL,保证同一时刻同一进程中只有一个线程被执行,获取锁并获取资源,避免了多线程并发执行,保证了线程的安全,但无法使用多核优势。

结论:

多线程用于IO密集型
多进程用于计算密集型

    原文作者:Raina_RLN
    原文地址: https://www.cnblogs.com/raina/p/11175565.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞