新的一年到了,首先在这里给各位老司机们拜个年,祝大家鸡年大吉吧!
然后承接上一篇讲到的关于okhttp websocket的使用(还没看的小伙伴们,请点传送门),今天将深入源码进一步了解它的运作过程。
OkHttp Websocket 源码深入
建立websocket连接
//新建client
OkHttpClient client = new OkHttpClient.Builder()
.build();
//构造request对象
Request request = new Request.Builder()
.url(wsUrl)
.build();
//建立连接
client.newWebSocket(request, new WebSocketListener(){});
websocket的建立,其最关键的区别就在于最后是通过OkHttpClient
的实例调用该方法newWebSocket
来实现与服务器的建连,并且该方法会返回一个WebSocket
的实例。
我们先来看看WebSocket
类,这是一个接口类,定义了websocket用到的方法:
public interface WebSocket {
/** 得到该连接的原始请求 */
Request request();
/**
* 该方法返回要传输到服务器的所有消息的大小(以字节为单位)
* 但不包括一些帧的开销和缓冲区的字节
* 当没有等待发送的消息时,该方法返回0
* 当websocket被取消后,可能会返回非零值表示队列存在消息但没有发送出去
*/
long queueSize();
/**
* 发送一个String类型的消息
* 该方法会立刻返回一个发送是否成功的标识
* true即表示消息成功加入队列当中
* false则可能由一下几种情况造成:
* 1. 消息将造成消息缓冲区的溢出,因此拒绝发送
* 2. websocket 处在closing,closed,canceled的状态
*/
boolean send(String text);
/**
* 发送一个ByteString类型的消息
* 其余同上
* 关于ByteString类型:
* 是okio包中提供的一种不可改变的byte序列。提供了一种基于String,采用char访问的二进制模式
* 通过ByteString可以像一般value一样处理二进制数据
* 并且提供了对encode/decode中的HEX,Base64以及UTF-8支持
*/
boolean send(ByteString bytes);
/**
* 尝试正常关闭websocket
* 此时已在队列的消息将在关闭之前发送
* 但后续请求将无法加入到消息队列中,并且调用的send方法将会返回一个false
*
* 该方法会立刻一个关闭websocket是否成功的标识
* true即表示websocket被正常关闭
* false则表示该websocket正进行关闭,或已经关闭或已经被取消
* 参数code: 状态码的定义可由下面的websocket规范得知
* http://tools.ietf.org/html/rfc6455#section-7.4
*/
boolean close(int code, String reason);
/**
* 立刻释放websocket并丢弃队列中的消息
* 当websocket已经被关闭或取消时该方法无效
*/
void cancel();
/** 创建websocket实例 */
interface Factory {
WebSocket newWebSocket(Request request, WebSocketListener listener);
}
}
而从WebSocket
的实例是由OkHttpClient
的实例调用创建出来的,我们就不难发现OkHttpClient
实现了了WebSocket.Factory
的newWebSocket
方法:
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new SecureRandom());
webSocket.connect(this);
return webSocket;
}
而这个RealWebSocket
正是WebSocket
的实现类:
public RealWebSocket(Request request, WebSocketListener listener, Random random) {
if (!"GET".equals(request.method())) {
throw new IllegalArgumentException("Request must be GET: " + request.method());
}
this.originalRequest = request;
this.listener = listener;
this.random = random;
byte[] nonce = new byte[16];
random.nextBytes(nonce);
this.key = ByteString.of(nonce).base64();
/** 用于轮询队列并发送消息 */
this.writerRunnable = new Runnable() {
@Override public void run() {
try {
while (writeOneFrame()) {
}
} catch (IOException e) {
failWebSocket(e, null);
}
}
};
}
通过该类调用connect()
方法,最终实现websocket连接的建立:
public void connect(OkHttpClient client) {
client = client.newBuilder()
.protocols(ONLY_HTTP1)
.build();
final int pingIntervalMillis = client.pingIntervalMillis();
final Request request = originalRequest.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.build();
call = Internal.instance.newWebSocketCall(client, request);
/**
* WebSocket协议首先会通过发送一个http请求来完成一个握手的过程
* 客户端发送一个请求协议升级的get请求给服务端
* 服务端如果支持的话会返回http code 为101,表示可以切换到对应的协议
*/
call.enqueue(new Callback() {
@Override public void onResponse(Call call, Response response) {
try {
/** 这里就对http code是否为101进行了校验 */
checkResponse(response);
} catch (ProtocolException e) {
failWebSocket(e, response);
closeQuietly(response);
return;
}
StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
streamAllocation.noNewStreams();
Streams streams = new ClientStreams(streamAllocation);
try {
/** 成功升级为websocket协议后,websocket正式启动,这里将其状态回调出去 */
listener.onOpen(RealWebSocket.this, response);
String name = "OkHttp WebSocket " + request.url().redact();
/** 初始化了websocket流的读写 */
initReaderAndWriter(name, pingIntervalMillis, streams);
streamAllocation.connection().socket().setSoTimeout(0);
/** 开启轮询读取websocket的消息 */
loopReader();
} catch (Exception e) {
failWebSocket(e, null);
}
}
@Override public void onFailure(Call call, IOException e) {
failWebSocket(e, null);
}
});
}
至此完成了websocket的建连。
websocket的消息读取
在通过newWebSocket
方法实现与服务器的建连的时候,参数中除了传入request
对象外,还需要传入WebSocketListener
来实现websocket连接的状态与消息读取的回调:
client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
}
@Override
public void onMessage(WebSocket webSocket, String text) {
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
}
});
进一步找到onMessage
调用的位置:
@Override public void onReadMessage(String text) throws IOException {
listener.onMessage(this, text);
}
@Override public void onReadMessage(ByteString bytes) throws IOException {
listener.onMessage(this, bytes);
}
onReadMessage
是在WebSocketReader.FrameCallback
中定义的并交由RealWebSocket
实现:
final class WebSocketReader {
public interface FrameCallback {
void onReadMessage(String text) throws IOException;
void onReadMessage(ByteString bytes) throws IOException;
void onReadPing(ByteString buffer);
void onReadPong(ByteString buffer);
void onReadClose(int code, String reason);
}
那么这个onReadMessage
又是在哪调用的呢?
回到上面提到的websocket建连的时候,我们通过loopReader()
开启轮询读取websocket的消息:
public void connect(OkHttpClient client) {
...
call.enqueue(new Callback() {
@Override public void onResponse(Call call, Response response) {
...
loopReader();
...
}
...
});
}
每次消息都经由processNextFrame
进行处理:
void processNextFrame() throws IOException {
/** 校验消息合法性,并判断该数据帧为控制帧还是消息帧 */
readHeader();
if (isControlFrame) {
readControlFrame();
} else {
readMessageFrame();
}
}
我们发现onReadMessage
正是在此处被调用:
private void readMessageFrame() throws IOException {
int opcode = this.opcode;
if (opcode != OPCODE_TEXT && opcode != OPCODE_BINARY) {
throw new ProtocolException("Unknown opcode: " + toHexString(opcode));
}
Buffer message = new Buffer();
readMessage(message);
if (opcode == OPCODE_TEXT) {
frameCallback.onReadMessage(message.readUtf8());
} else {
frameCallback.onReadMessage(message.readByteString());
}
}
简单来说,在websocket建立的时候,我们实例了WebSocketReader
用于处理服务端发送过来的消息并通过方法loopReader
开启了轮询进行消息的读取和处理,最后经过FrameCallback
,WebSocketListener
回调从而获得最终的消息。
websocket的消息发送
通过WebSocket
的实例调用send
实现消息的发送:
mWebSocket.send("msg");
其具体的实现是在RealWebSocket
下:
@Override public boolean send(String text) {
if (text == null) throw new NullPointerException("text == null");
return send(ByteString.encodeUtf8(text), OPCODE_TEXT);
}
@Override public boolean send(ByteString bytes) {
if (bytes == null) throw new NullPointerException("bytes == null");
return send(bytes, OPCODE_BINARY);
}
它们最终都一致调用了如下方法:
private synchronized boolean send(ByteString data, int formatOpcode) {
//校验websocket状态
if (failed || enqueuedClose) return false;
//消息溢出则发送失败
if (queueSize + data.size() > MAX_QUEUE_SIZE) {
close(CLOSE_CLIENT_GOING_AWAY, null);
return false;
}
//消息成功入队
queueSize += data.size();
messageAndCloseQueue.add(new Message(formatOpcode, data));
runWriter();
return true;
}
private void runWriter() {
assert (Thread.holdsLock(this));
if (executor != null) {
executor.execute(writerRunnable);
}
}
可以看出实现发送的关键在于writerRunnable
,而它在实例化RealWebSocket
已经完成定义:
public RealWebSocket(Request request, WebSocketListener listener, Random random) {
...
this.writerRunnable = new Runnable() {
@Override public void run() {
try {
/** 持续发送消息直至队列为空或者websocket连接失败 */
while (writeOneFrame()) {
}
} catch (IOException e) {
failWebSocket(e, null);
}
}
};
}
//从队列中取出数据帧并发送出去;优先处理pong帧再到消息帧关闭帧。
boolean writeOneFrame() throws IOException {
WebSocketWriter writer;
ByteString pong;
Object messageOrClose = null;
int receivedCloseCode = -1;
String receivedCloseReason = null;
Streams streamsToClose = null;
synchronized (RealWebSocket.this) {
if (failed) {
return false; // websocket连接失败,跳出循环
}
writer = this.writer;
pong = pongQueue.poll();
//判断是否有pong消息
if (pong == null) {
messageOrClose = messageAndCloseQueue.poll();
//判断是否为关闭帧
if (messageOrClose instanceof Close) {
receivedCloseCode = this.receivedCloseCode;
receivedCloseReason = this.receivedCloseReason;
if (receivedCloseCode != -1) {
streamsToClose = this.streams;
this.streams = null;
this.executor.shutdown();
} else {
// When we request a graceful close also schedule a cancel of the websocket.
cancelFuture = executor.schedule(new CancelRunnable(),
((Close) messageOrClose).cancelAfterCloseMillis, MILLISECONDS);
}
} else if (messageOrClose == null) { //消息队列为空,跳出循环
return false;
}
}
}
try {
if (pong != null) {
writer.writePong(pong);
} else if (messageOrClose instanceof Message) {
ByteString data = ((Message) messageOrClose).data;
//这里将数据转化为可供websocket交互的格式
BufferedSink sink = Okio.buffer(writer.newMessageSink(
((Message) messageOrClose).formatOpcode, data.size()));
sink.write(data);
sink.close();
synchronized (this) {
queueSize -= data.size();
}
} else if (messageOrClose instanceof Close) {
Close close = (Close) messageOrClose;
writer.writeClose(close.code, close.reason);
// We closed the writer: now both reader and writer are closed.
if (streamsToClose != null) {
listener.onClosed(this, receivedCloseCode, receivedCloseReason);
}
} else {
throw new AssertionError();
}
return true;
} finally {
//释放资源
closeQuietly(streamsToClose);
}
}
至此完成了websocket消息的发送。
关闭websocket连接
mWebSocket.close();
其具体的实现是在RealWebSocket
下:
@Override
public boolean close(int code, String reason) {
return close(code, reason, CANCEL_AFTER_CLOSE_MILLIS);
}
synchronized boolean close(int code, String reason, long cancelAfterCloseMillis) {
validateCloseCode(code);
ByteString reasonBytes = null;
if (reason != null) {
reasonBytes = ByteString.encodeUtf8(reason);
if (reasonBytes.size() > CLOSE_MESSAGE_MAX) {
throw new IllegalArgumentException("reason.size() > " + CLOSE_MESSAGE_MAX + ": " + reason);
}
}
if (failed || enqueuedClose) return false;
/** 标识不能再有消息入队 */
enqueuedClose = true;
/** 到这里我们可以发现往后的步骤与消息的发送相同,这边就不再赘述 */
messageAndCloseQueue.add(new Close(code, reasonBytes, cancelAfterCloseMillis));
runWriter();
return true;
}
至此已完成了okhttp websocket主要的源码阅读。
如有不恰当或有问题的地方,欢迎大家指出。
有兴趣的小伙伴们可以自己动手实现并一步一步调试来观摩观摩源码。
这里顺便再放出一次我自己用的demo还有我对okhttp websocket的封装使用:
觉得不错的话欢迎star一个支持下~