python - OS相关

前言

我是偏后台开发的coder,学到python的这里时尤其的关注。操作系统的相关接口在python是不是比linux C中要简洁的多。OS的概念不说了,这次笔记集中关注python中多进程、多线程、高并发、加锁同步、进程间通信等实现。

Definition

进程(process),在我的理解中,就是一个任务,是一段运行的程序。后台的童鞋应该知道其本质就是一个task_struct结构体,里面记载着程序运行需要的所有资源和他自身的信息。当他获得运行所需的内存、CPU资源等,也就是成为了一个running状态的进程。

可以把进程理解为一个任务,那线程就是完成这个任务的执行流。线程是CPU调度的最小粒度。通常来说,现在的项目中,至少我接触的,一个进程中都包括着不止一个的线程。毕竟现在的OS都是SMP的,充分利用多核心提高程序效率应该是每个coder敲键盘时需要优先考虑的。

多进程

linux的内核向外提供了 fork() 这个系统调用来创建一个本进程的拷贝,当然往往fork()后都跟着 exec() 族系统调用,我们创建一个进程一般都是为了执行其他的代码程序。

python的 os 模块封装了很多常用的系统调用,可以说是python中最常用的一个库了。举个栗子:

import os

print('Process (%s) start...' % os.getpid())

pid = os.fork()
if pid == 0:
    print('Child process (%s).' % os.getpid())
else:
    print('Parent process (%s).' %  pid)

fork() 会返回两个结果,父进程返回一个大于0的无符号数,子进程返回0。

我们都知道socket()是有好几个步骤的,而对于web服务器,每天每时每分都有着成千上万的访问请求。如果是一个进程向外提供服务,那就是这个进程为第一个用户从创建socket到关闭,再为下一个用户提供服务。用户时排着队接受服务的,显然不符合逻辑。

拿Apache举个栗子,它是多进程架构服务器的代表。

  1. 运行主程序,只负责server端socket的listen()accept(),当然主进程是一个守护进程
  2. 每当一个用户请求服务,就会调用fork(),在子程序中接受数据,read()或者write(),然后提供服务直至关闭
  3. 主进程还是要负责回收结束的子进程资源的

伪代码如下:

import os

server_fd = socket()
bind(server_fd,ip,port)
listen(server_fd,MAX_PROCESS)
While Online:
    connfd = accpet(server_fd)
    for each connfd:
        os.fork()
        // TODO
close(server_fd)

上面这段程序只适用linux平台,windows平台创建进程的方式并不是 fork() 调用。python中提供了multiprocesssing模块来兼容windows,比起fork(),代码的语义更好理解一些

from multiprocessing import Process
import os

def run_proc(name):
    print('Child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    #创建Process实例
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

这里的join语义和linux平台的多线程中的join语义很像,但效果其实是linux平台的wait

有时候需要进程池,multiprocessing 也直接提供了pool用于创建。

pool.apply(func,params) 是单进程阻塞模式
pool.apply_async(func,params,callback)  是多进程异步模式
pool.map(func,iter) 用于可迭代结构,阻塞式调用
pool.map_async(func,iter,callback)

一般情况下,还是把进程数控制成和CPU核数相同。pool结束调用pool.join()回收进程资源时,需要先pool.close()

上面提到过,创建一个新进程的原因往往是为了加载新的代码,去执行新的任务。所以python封装了fork()和之后的exec族,提供subprocess模块,直接操作新的子进程。这个包,一般是用来执行外部的命令或者程序如shell命令,和os.system()类似。

import subprocess

r = subprocess.call(['ls','-l'])    #阻塞
r = subprocess.call('ls -l',shell = True)
r = subprocess.check_call(['ls','-l'])  #returncode不为0则raise CalledProcessError异常
r = subprocess.check_output('ls -l',shell=True)
r = subprocess.Popen(['ls','-l'])   #非阻塞,需主动wait

r = subprocess.Popen(['ls','-l'],stdin=child1.stdout,stdout=subprocess.PIPE, stderr=subprocess.PIPE)    #设置标准输入输出出错的句柄
out,err = r.communicate()   #继续输入,或者用来获得返回的元组(stdoutdata,stderrdata)

手动继续输入的例子:

import subprocess

print('$ python')
p = subprocess.Popen(['python'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b"print('Hello,world')")
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

进程间通信

multiprocessingQueue或者Pipe来帮助实现,类似linux中的Pipe,打开一条管道,一个进程往里面扔数据,一个从另一头捡数据。python中的Pipe是全双工管道,既可以读也可以写。可以通过Pipe(duplex=False)创建半双工管道。

from multiprocessing import Pipe,Queue
#实例
q = Queue()
p = Pipe()
#写入数据
q.put(value)
p[0].send(value)
#读数据
q.get()
p[1].recv()

分别举个例子,用Queue

from multiprocessing import Process, Queue
import os, time, random

def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A','B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        time.sleep(random.random())
        print('Get %s from queue.' % value)

if __name__=='__main__':
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))

    pw.start()
    pr.start()
    pw.join()
    pr.terminate()

用Pipe:

from multiprocessing import Process, Pipe
import os, time, random

def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A','B', 'C']:
        print('Put %s to pipe...' % value)
        q.send(value)
        time.sleep(random.random())

def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.recv()
        time.sleep(random.random())
        print('Get %s from pipe.' % value)

if __name__=='__main__':
    p = Pipe()
    pw = Process(target=write, args=(p[0],))
    pr = Process(target=read, args=(p[1],))

    pw.start()
    pr.start()
    pw.join()
    time.sleep(2)
    pr.terminate()

多线程

有人会有疑问,问什么要在进程中开多个线程,多创建几个进程一起干活不就行了。其实这样是可以的,只不过进程这个单位有点大,比较占用资源,创建的时候开销比较大(尤其在windows系统下),进程多了CPU调度起来,在进程间切换也是非常耗时的。还有多任务协同合作时,需要数据交换,进程间通信也是开销,而一个进程中的线程是共享进程的内存空间的,可以直接交互。所以现在多线程的程序更加常见。

不过多线程也是有弊端的,协同合作的多线程,有一个挂了,会影响到所有的其他线程,也就代表这个任务是做不下去了。进程因为有着独立的地址空间,所以一个进程死了对其他进程的影响可以说很小。

python中提供了threading模块为多线程服务,threading.current_thread()返回当前线程,主线程名为MainThread

import threading

thread = threading.Thread(target=func,args=())
thread.start()
thread.join()

多线程编程,最重要的就是同步和互斥,也就是各种锁的用法。为什么要用锁,后台的童鞋应该都懂,现在的SMP操作系统都是抢占式内核,也就是即使你不同的核共同工作时,很幸运的没有改乱一个共享变量,当然这就不可能了。当你的CPU时间片到时间了,或者需要内存或者IO资源,你被踢出了CPU的工作队列,你必须得在走的时候给你的资源把锁加上,下次再来接着做。线程同步的重点的是对共享资源的判断,和选择合适的锁。也就是对什么资源加锁和用什么锁。

不过在python中很遗憾,多线程存在着天生的缺陷,因为有着GIL的存在,这是python解释器的设计缺陷。导致python程序在被解释时,只能有一个线程。不过,对于IO密集型的程序,多线程的设计还是很有帮助的。比如爬虫

  • 最常用的锁,类似 mutex
  • 条件变量,threading.Condition()会包含一个Lock对象,因为这两者一般都是配合使用的。
  • 信号量,threading.Semaphore()
import threading

lock = threading.Lock()
lock.acquire()
lock.realease()  #配合try...finally保证最后释放掉锁,防止死锁

cond = threading.Condition()
cond.wait()
cond.notify()   cond.notify_all()

sem = threading.Semaphore(NUM)
sem.acquire()
sem.realease()

event = threading.Event()   #相当于没有lock的cond
event.set(True)
event.clear()

假设以下的情况

thread_func(params):
    web_res = params
    def func1(web_res):
        http = web_res.http
        TODO
    def func2(web_res):
        data = web_res.data
        TODO
    def func3(web_res):
        user = web_res.user
        TODO

在一个线程中,又存在多个子线程或者函数时,需要把一个参数都传给它们时。可以通过唯一的id来区分出从全局变量自己的局部变量时。可以用ThreadLocal实现

import threading 

student = threading.local()

def func(name):
    person = student.name  #需要之前关联过

p1 = threading.Thread(target=func,argc='A')
p1 = threading.Thread(target=func,argc='B')

通过ThredLocal免去了我们亲自去字典中存取。通常用于web开发中的为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等。

分布式进程

分布式是为了在横向上提升整个系统的负载能力。python中multiprocessing模块中的manage子模块支持把多进程分布到不同的机器上。当然肯定存在一个master进程来负责任务的调度。依赖manage子模块,可以很轻松的写出分布式程序。

比如爬虫,想要爬下豆瓣或者知乎这样网站的全部数据,用单机估计得花费好几年。可以把需要爬的网站的所有URL放在一个Queue中,master进程负责Queue的管理,可以将很多设备与master进程所在的设备建立联系,爬虫开始获取URL时,都从主机器获取。这样就能保证协同不冲突的合作。

    原文作者:Allenware
    原文地址: https://www.jianshu.com/p/b2495685e98e
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞