select同时监控多个sockets,支持网络服务和多个客户端通信。
以下内容部分来源于:http://www.cnblogs.com/coser/archive/2012/01/06/2315216.html。
该模块可以访问大多数操作系统中的select()和poll()函数, Linux2.5+支持的epoll()和大多数BSD支持的kqueue()。请注意,在Windows上,它仅适用于select,在其他操作系统上,它也适用于其他类型的文件(特别是在Unix上,它还可以用于管道)。它不能用于确定常规文件是否变化。
Select:synchronous I/O multiplexing. select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。Select是跨平台的。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
Poll:wait for some event on a file descriptor。poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候 将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。不过poll并不适用于windows平台。
Epoll:I/O event notification facility。Linux 2.5.44以后支持,是性能最好的多路I/O就绪通知方法。epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描 述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进 行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制, 迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
Kqueue:暂不涉及。
使用select
select监控sockets,打开的文件,管道(任何有fileno()方法返回一个有效的文件描述符的东东),直到它们变成可读可写或发生通讯错误。它可以更容易地同时监控多个连接,比使用socket超时轮询更有效,因为监控在操作系统网络层而不是python解析器。注意windows的select并不支持打开文件。
下面代码通过select监控多个连接。注意通过server.setblocking(0)设置为非阻塞模式。select()的参数为3个列表:第一列表为读取输入数据的对象;第2个接收要发送的数据,第3个存放errors。select()返回的3个列表和输入类似:readable,writable,exceptional。
readable有3种可能:对于用来侦听连接主服务器socket,表示已准备好接受一个到来的连接;对于已经建立并发送数据的链接,表示有数据到来;如果没数据到来,表示链接已经关闭。
writable的情况:连接队列中有数据,发送下一条消息。如果队列中无数据,则从output队列中删除。
socket有错误,也要从output队列中删除。
select_echo_server.py
import selectimport socketimport sysimport Queue# Create a TCP/IP socketserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setblocking(0)# Bind the socket to the portserver_address = ('localhost', 10000)print >>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address)# Listen for incoming connectionsserver.listen(5)# Sockets from which we expect to readinputs = [ server ]# Sockets to which we expect to writeoutputs = [ ]# Outgoing message queues (socket:Queue)message_queues = {}while inputs:
# Wait for at least one of the sockets to be ready for processing
print >>sys.stderr, 'waiting for the next event'
readable, writable, exceptional = select.select(inputs, outputs, inputs)
# Handle inputs
for s in readable:
if s is server:
# A "readable" socket is ready to accept a connection
connection, client_address = s.accept()
print >>sys.stderr, ' connection from', client_address
connection.setblocking(0)
inputs.append(connection)
# Give the connection a queue for data we want to send
message_queues[connection] = Queue.Queue()
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print >>sys.stderr, ' received "%s" from %s' % \ (data, s.getpeername())
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
else:
# Interpret empty result as closed connection
print >>sys.stderr, ' closing', client_address # Stop listening for input on the connection
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
# Handle outputs
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
# No messages waiting so stop checking for writability.
print >>sys.stderr, ' ', s.getpeername(), 'queue empty'
outputs.remove(s)
else:
print >>sys.stderr, ' sending "%s" to %s' % \ (next_msg, s.getpeername())
s.send(next_msg)
# Handle "exceptional conditions"
for s in exceptional:
print >>sys.stderr, 'exception condition on', s.getpeername()
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
select_echo_multiclient.py
import socketimport sysmessages = [ 'This is the message. ',
'It will be sent ',
'in parts.',
]server_address = ('localhost', 10000)# Create a TCP/IP socketsocks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]# Connect the socket to the port where the server is listeningprint >>sys.stderr, 'connecting to %s port %s' % server_addressfor s in socks:
s.connect(server_address)for message in messages:
# Send messages on both sockets
for s in socks:
print >>sys.stderr, '%s: sending "%s"' % \ (s.getsockname(), message)
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print >>sys.stderr, '%s: received "%s"' % \ (s.getsockname(), data)
if not data:
print >>sys.stderr, 'closing socket', s.getsockname()
s.close()
执行结果:
# ./select_echo_server.py
starting up on localhost port 10000
waiting for the next event
connection from ('127.0.0.1', 35424)waiting for the next event
connection from ('127.0.0.1', 35425)waiting for the next event
received "This is the message. " from ('127.0.0.1', 35424)
received "This is the message. " from ('127.0.0.1', 35425)waiting for the next event
sending "This is the message. " to ('127.0.0.1', 35424)
sending "This is the message. " to ('127.0.0.1', 35425)waiting for the next event ('127.0.0.1', 35424) queue empty ('127.0.0.1', 35425) queue empty
waiting for the next event
received "It will be sent " from ('127.0.0.1', 35424)
received "It will be sent " from ('127.0.0.1', 35425)waiting for the next event
sending "It will be sent " to ('127.0.0.1', 35424)
sending "It will be sent " to ('127.0.0.1', 35425)waiting for the next event ('127.0.0.1', 35424) queue empty ('127.0.0.1', 35425) queue empty
waiting for the next event
received "in parts." from ('127.0.0.1', 35424)
received "in parts." from ('127.0.0.1', 35425)waiting for the next event
sending "in parts." to ('127.0.0.1', 35424)
sending "in parts." to ('127.0.0.1', 35425)waiting for the next event ('127.0.0.1', 35424) queue empty ('127.0.0.1', 35425) queue empty
waiting for the next event
closing ('127.0.0.1', 35425)waiting for the next event
closing ('127.0.0.1', 35425)waiting for the next event
# ./select_echo_multiclient.py
connecting to localhost port 10000('127.0.0.1', 35424): sending "This is the message. "('127.0.0.1', 35425): sending "This is the message. "('127.0.0.1', 35424): received "This is the message. "('127.0.0.1', 35425): received "This is the message. "('127.0.0.1', 35424): sending "It will be sent "('127.0.0.1', 35425): sending "It will be sent "('127.0.0.1', 35424): received "It will be sent "('127.0.0.1', 35425): received "It will be sent "('127.0.0.1', 35424): sending "in parts."('127.0.0.1', 35425): sending "in parts."('127.0.0.1', 35424): received "in parts."('127.0.0.1', 35425): received "in parts."
非阻塞I/0中使用超时
Select的第四个参数可以设置超时。超时时,select()返回3个空列表。
服务器端修改如下:
print >>sys.stderr, '\nwaiting for the next event'
timeout = 1
readable, writable, exceptional = select.select(
inputs, outputs, inputs, timeout)
if not (readable or writable or exceptional):
print >>sys.stderr, ' timed out, do some other work here'
continue
客户端:
import socketimport sysimport time# Create a TCP/IP socketsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# Connect the socket to the port where the server is listeningserver_address = ('localhost', 10000)print >>sys.stderr, 'connecting to %s port %s' % server_address
sock.connect(server_address)time.sleep(1)messages = [ 'Part one of the message.',
'Part two of the message.',
]amount_expected = len(''.join(messages))try:
# Send data
for message in messages:
print >>sys.stderr, 'sending "%s"' % message
sock.sendall(message)
time.sleep(1.5)
# Look for the response
amount_received = 0
while amount_received < amount_expected:
data = sock.recv(16)
amount_received += len(data)
print >>sys.stderr, 'received "%s"' % datafinally:
print >>sys.stderr, 'closing socket'
sock.close()
执行结果:
# python select_echo_server_timeout.pywaiting for the next event
timed out, do some other work here
waiting for the next event
connection from ('127.0.0.1', 52875)waiting for the next event
received "Part one of the message." from ('127.0.0.1', 52875)waiting for the next event
sending "Part one of the message." to ('127.0.0.1', 52875)waiting for the next event('127.0.0.1', 52875) queue empty
waiting for the next event
timed out, do some other work here
waiting for the next event
received "Part two of the message." from ('127.0.0.1', 52875)waiting for the next event
sending "Part two of the message." to ('127.0.0.1', 52875)waiting for the next event('127.0.0.1', 52875) queue empty
waiting for the next event
timed out, do some other work here
waiting for the next event
closing ('127.0.0.1', 52875)waiting for the next event
timed out, do some other work here
# python select_echo_slow_client.py
connecting to localhost port 10000
sending "Part one of the message."sending "Part two of the message."received "Part one of the "received "message.Part two"received " of the message."closing socket
poll
Poll和select类似,但底层实现更有效, 但不支持windows。它的timeout是毫秒为单位的,而select是秒。
Python的有类管理监控注册数据通道。通过register()添加数据通道,标志表示要关注哪些事件。标志flag列表如下:
POLLIN:Input ready
POLLPRI:Priority input ready
POLLOUT:Able to receive output
POLLERR:Error
POLLHUP:Channel closed
POLLNVAL:Channel not open
下面的实例参见《The Python Standard Library by Example 2011》
其他
Epoll是poll的扩展,Kqueue是BSD内核队列,kevent是BSD内核event。
最新版本参见
参考资料
The Python Standard Library by Example 2011
socket (http://docs.python.org/library/socket.html) The standard library documentation for this module.
本文地址
http://automationtesting.sinaapp.com/blog/m_socket_connection
本站地址:python自动化测试http://automationtesting.sinaapp.com python开发自动化测试群113938272和开发测试群6089740 微博 http://weibo.com/cizhenshi