OkHttp 源码解析(二)


上一篇文章介绍了 OkHttp 的基本流程,包括 Request 的创建、Dispatcher 对 Request 的调度以及 Interceptor 的使用。OkHttp 中默认会添加 RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor 以及 CallServerInterceptor 这几个拦截器。本文主要看一下 RetryAndFollupInterceptor 并引出建立连接相关的分析。


Interceptor 最主要的代码都在 intercept 中,下面是 RetryAndFollowUpInterceptor#intercept 中的部分代码:

public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);  // 1

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();  // 2
        throw new IOException("Canceled");
    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); // 3

上面注释 1 处创建了一个 StreamAllocation 对象,注释 2 处 调用了其 release 方法,注释 3 处则把这个对象传给了下一个 Interceptor。StreamAlloction 这个类很重要,下面就看一下它的用途。


StreamAllocation 从名字上看是流分配器,其实它是统筹管理了几样东西,注释写的非常清楚:

 /** * This class coordinates the relationship between three entities: * * <ul> * <li><strong>Connections:</strong> physical socket connections to remote servers. These are * potentially slow to establish so it is necessary to be able to cancel a connection * currently being connected. * <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on * connections. Each connection has its own allocation limit, which defines how many * concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream * at a time, HTTP/2 typically carry multiple. * <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and * its follow up requests. We prefer to keep all streams of a single call on the same * connection for better behavior and locality. * </ul> * 

简单来说, StreamAllocation 协调了 3 样东西:

Connections : 物理的 socket 连接
Streams:逻辑上的 HTTP request/response 对。每个 Connection 有个变量 allocationLimit ,用于定义可以承载的并发的 streams 的数量。HTTP/1.x 的 Connection 一次只能有一个 stream, HTTP/2 一般可以有多个。
Calls : Streams 的序列。一个初始的 request 可能还会有后续的 request(如重定向)。OkHttp 倾向于让一个 call 所有的 streams 运行在同一个 connection 上。
StreamAllocation 提供了一些 API 来释放以上的资源对象。 在 RetryAndFollowUpInterceptor 中创建的 StreamAllocation 对象下一个用到的地方是 ConnectInterceptor,其 intercept 代码如下:

public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);

在上面的代码中, streamAllocation 创建了 httpCodec 以及 connection 对象。 httpCodec 即是上面所说的 Streams,而 connection 则是上面的 Connection,Connection 是一个接口,它的唯一实现类是 RealConnection。


StreamAllocation 中的 newStream 方法用于寻找新的 RealConnection 以及 HttpCodec,代码如下:

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
    } catch (IOException e) {
      throw new RouteException(e);

在 newStream 中,通过 findHealthyConnection 找到可用的 Connection ,并用这个 Connection 生成一个 HttpCodec 对象。 findHealthyConnection 是找到一个健康的连接,代码如下:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
      // successCount == 0 表示还未使用过,则可以使用
        if (candidate.successCount == 0) {
          return candidate;

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {

      return candidate;

public boolean isHealthy(boolean doExtensiveChecks) {
    if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
      return false;
    ... // 省略 Http2 代码
    return true;

在一个无限循环中,通过 findConnection 寻找一个 connection,并判断是否可用,首先如果没有使用过的肯定是健康的可直接返回,否则调用 isHealthy,主要就是判断 socket 是否关闭。这里的 socket 是在 findConnection 中赋值的,再看看 findConnection 的代码:

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;

      // Attempt to get a connection from the pool.
      // 1. 从 ConnectionPool 取得 connection
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;

      selectedRoute = route;

    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // 2. 有了 ip 地址后再从 connectionpool中取一次
      // This could match due to connection coalescing.
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) return connection;

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
      // 3. ConnectionPool 中没有,新创建一个
      result = new RealConnection(connectionPool, selectedRoute);
      // 3. 将 StreamAllocation 加入到 `RealConnection` 中的一个队列中

    // Do TCP + TLS handshakes. This is a blocking operation.
    // 4. 建立连接,在其中创建 socket
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      // 5. 将新创建的 connection 放到 ConnectionPool 中 
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;

    return result;

上面 Connection 的创建大体是以下几个步骤:

  1. 调用 Intenal.get 方法从 ConnectionPool 中获取一个 Connection,主要根据 url 的 host 判断,相关代码在 ConnectionPool 中。
  2. 如果没有并且又获取了 IP 地址,则再获取一次。
  3. 如果 ConnectionPool 中没有, 则新创建一个 RealConnection,并调用 acquire 将 StreamAllocation 中加入 RealConnection 中的一个队列中。
  4. 调用 RealConnection#connect 方法建立连接,在内部会创建 Socket。
  5. 将新创建的 Connection 加入到 ConnectionPool 中。

获取到了 Connection 之后,再创建一个 HttpCodec 对象。

public HttpCodec newCodec(
      OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
      source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);

根据是 Http1 还是 Http2 创建对应的 HttpCodec, 其中的 socket 是在 RealConnection 中的 connect 方法创建的。下面具体看看RealConnection。


RealConnection 封装的是底层的 Socket 连接,内部必然有一个 Socket 对象,下面是 RealConnection 内部的变量:

public final class RealConnection extends Http2Connection.Listener implements Connection {
  private static final String NPE_THROW_WITH_NULL = "throw with null exception";
  private final ConnectionPool connectionPool;
  private final Route route;

  // The fields below are initialized by connect() and never reassigned.

  /** The low-level TCP socket. */
  private Socket rawSocket;

  /** * The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or * {@link #rawSocket} itself if this connection does not use SSL. */
  private Socket socket;
  private Handshake handshake;
  private Protocol protocol;
  private Http2Connection http2Connection;
  private BufferedSource source;
  private BufferedSink sink;

  // The fields below track connection state and are guarded by connectionPool.

  /** If true, no new streams can be created on this connection. Once true this is always true. */
  public boolean noNewStreams;

  public int successCount;

  /** * The maximum number of concurrent streams that can be carried by this connection. If {@code * allocations.size() < allocationLimit} then new streams can be created on this connection. */
  public int allocationLimit = 1;

  /** Current streams carried by this connection. */
  public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();

  /** Nanotime timestamp when {@code allocations.size()} reached zero. */
  public long idleAtNanos = Long.MAX_VALUE;
  • Route 表示的是与服务端建立的路径,其实内部封装了 Address,Address 则是封装了请求的 URL。
  • rawSocket对象代表底层的连接,还有一个 socket 是用于 Https, 对于普通的 Http 请求来说,这两个对象是一样的。 source 和sink 则是利用 Okio 封装 socket 得到的输入输出流
  • noNewStream 对象用于标识这个 Connection 不能再用于 Http 请求了,一旦设置为
    true, 则不会再变。
  • allocationLimit 指的是这个 Connection 最多能同时承载几个 Http 流,对于
    Http/1 来说只能是一个。
  • allocations 是一个 List 对象,里面保存着正在使用这个 Connection 的StreamAllocation 的弱引用,当 StreamAllocation 调用 acquire 时,便会将其弱引用加入这个List,调用 release 则是移除引用。allocations 为空说明此 Connection 为闲置,ConnectionPool 利用这些信息来决定是否关闭这个连接。


RealConnection 用于建立连接,里面有相应的 connect 方法:

public void connect(
      int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
    while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
          // 创建socket,建立连接
          connectSocket(connectTimeout, readTimeout);
        // 建立

private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();
    // 创建 socket
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    try {
      // 建立连接,相当于调用 socket 的 connect 方法
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      throw ce;
    try {
      // 获取输入输出流
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);

如果不是 Https, 则调用 connectSocket,在内部创建 rawSocket 对象,设置超时时间。紧接着 Platform.get().connectSocket 根据不同的平台调用相应的 connect 方法,这样 rawSocket 就连接到服务端了。然后是用 Okio 封装 rawSocket 的输入输出流,这里的输入输出流最终是交给 HttpCodec 进行 Http 报文的写入都读取。通过以上步骤,就实现了 Http 请求的连接。


本文从 RetryAndFollowupIntercept 中创建 StreamAllocation 对象,到 Connection 中创建 RealConnection 和 HttpCodec,分析了 OkHttp 建立连接的基本过程。可以看出, OkHttp 中的连接由
RealConnection 封装,Http 流的输入输出由 HttpCodec 操作,而 StreamAllocation 则统筹管理这些资源。在连接的寻找与创建过程,有个关键的东西是 ConnectionPool, 即连接池。它负责管理所有的 Connection,OkHttp 利用这个连接池进行 Connection 的重用以提高网络请求的效率。
