OkHttp3的连接池及连接建立过程分析(上篇)

达芬奇密码2018-08-17 10:14

OkHttp3通过Interceptor链来执行HTTP请求,整体的执行过程大体如下:

这些Interceptor中每一个的职责,这里不再赘述。

在OkHttp3中,StreamAllocation是用来建立执行HTTP请求所需网络设施的组件,如其名字所显示的那样,分配Stream。但它具体做的事情根据是否设置了代理,以及请求的类型,如HTTP、HTTPS或HTTP/2的不同而有所不同。代理相关的处理,包括TCP连接的建立,在 OkHttp3中的代理与路由 一文中有详细的说明。

在整个HTTP请求的执行过程中,StreamAllocation 对象分配的比较早,在RetryAndFollowUpInterceptor.intercept(Chain chain)中就完成了:

  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

StreamAllocation的对象构造过程没有什么特别的:

  public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
    this.connectionPool = connectionPool;
    this.address = address;
    this.routeSelector = new RouteSelector(address, routeDatabase());
    this.callStackTrace = callStackTrace;
  }

在OkHttp3中,okhttp3.internal.http.RealInterceptorChain将Interceptor连接成执行链。RetryAndFollowUpInterceptor借助于RealInterceptorChain将创建的StreamAllocation对象传递给后面执行的Interceptor。而在RealInterceptorChain中,StreamAllocation对象并没有被真正用到。紧跟在RetryAndFollowUpInterceptor之后执行的 okhttp3.internal.http.BridgeInterceptorokhttp3.internal.cache.CacheInterceptor,它们的职责分别是补足用户创建的请求中缺少的必须的请求头和处理缓存,也没有真正用到StreamAllocation对象。

在OkHttp3的HTTP请求执行过程中,okhttp3.internal.connection.ConnectInterceptorokhttp3.internal.http.CallServerInterceptor是与网络交互的关键。

CallServerInterceptor负责将HTTP请求写入网络IO流,并从网络IO流中读取服务器返回的数据。而ConnectInterceptor则负责为CallServerInterceptor建立可用的连接。此处 可用的 含义主要为,可以直接写入HTTP请求的数据:

  • 设置了HTTP代理的HTTP请求,与代理建立好TCP连接;
  • 设置了HTTP代理的HTTPS请求,与HTTP服务器建立通过HTTP代理的隧道连接,并完成TLS握手;
  • 设置了HTTP代理的HTTP/2请求,与HTTP服务器建立通过HTTP代理的隧道连接,并完成与服务器的TLS握手及协议协商;
  • 设置了SOCKS代理的HTTP请求,通过代理与HTTP服务器建立好连接;
  • 设置了SOCKS代理的HTTPS请求,通过代理与HTTP服务器建立好连接,并完成TLS握手;
  • 设置了SOCKS代理的HTTP/2请求,通过代理与HTTP服务器建立好连接,并完成与服务器的TLS握手及协议协商;
  • 无代理的HTTP请求,与服务器建立好TCP连接;
  • 无代理的HTTPS请求,与服务器建立TCP连接,并完成TLS握手;
  • 无代理的HTTP/2请求,与服务器建立好TCP连接,完成TLS握手及协议协商。

后面我们更详细地来看一下这个过程。

ConnectInterceptor的代码看上去比较简单:

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

ConnectInterceptorRealInterceptorChain获取前面的Interceptor传过来的StreamAllocation对象,执行 streamAllocation.newStream() 完成前述所有的连接建立工作,并将这个过程中创建的用于网络IO的RealConnection对象,以及对于与服务器交互最为关键的HttpCodec等对象传递给后面的Interceptor,也就是CallServerInterceptor

OkHttp3的连接池

在具体地分析 streamAllocation.newStream() 的执行过程之前,我们先来看一下OkHttp3的连接池的设计实现。

OkHttp3将客户端与服务器之间的连接抽象为Connection/RealConnection,为了管理这些连接的复用而设计了ConnectionPool。共享相同Address的请求可以复用连接,ConnectionPool实现了哪些连接保持打开状态以备后用的策略。

ConnectionPool是什么?

借助于ConnectionPool的成员变量声明来一窥ConnectionPool究竟是什么:

/**
 * Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
 * share the same {@link Address} may share a {@link Connection}. This class implements the policy
 * of which connections to keep open for future use.
 */
public final class ConnectionPool {
  /**
   * Background threads are used to cleanup expired connections. There will be at most a single
   * thread running per connection pool. The thread pool executor permits the pool itself to be
   * garbage collected.
   */
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

  private final Deque<RealConnection> connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning;

ConnectionPool的核心是RealConnection的容器,且是顺序容器,而不是关联容器。ConnectionPool用双端队列Deque<RealConnection>来保存它所管理的所有RealConnection

ConnectionPool还会对连接池中最大的空闲连接数及连接的保活时间进行控制,maxIdleConnectionskeepAliveDurationNs成员分别体现对最大空闲连接数及连接保活时间的控制。这种控制通过匿名的Runnable cleanupRunnable在线程池executor中执行,并在向连接池中添加新的RealConnection触发。

连接池ConnectionPool的创建

OkHttp3的用户可以自行创建ConnectionPool,对最大空闲连接数及连接的保活时间进行配置,并在OkHttpClient创建期间,将其传给OkHttpClient.Builder,在OkHttpClient中启用它。没有定制连接池的情况下,则在OkHttpClient.Builder构造过程中以默认参数创建:

    public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      proxySelector = ProxySelector.getDefault();
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();

ConnectionPool的默认构造过程如下:

  /**
   * Create a new connection pool with tuning parameters appropriate for a single-user application.
   * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
   * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
   */
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

在默认情况下,ConnectionPool 最多保存 5个 处于空闲状态的连接,且连接的默认保活时间为 5分钟

RealConnection的存/取

OkHttp内部的组件可以通过put()方法向ConnectionPool中添加RealConnection

  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

在向ConnectionPool中添加RealConnection时,若发现cleanupRunnable还没有运行会触发它的运行。

cleanupRunnable的职责本就是清理无效的RealConnection,只要ConnectionPool中存在RealConnection,则这种清理的需求总是存在的,因而这里会去启动cleanupRunnable。

根据需要启动了cleanupRunnable之后,将RealConnection添加进双端队列connections。

这里先启动 cleanupRunnable,后向 connections 中添加RealConnection。有没有可能发生:

启动cleanupRunnable之后,向connections中添加RealConnection之前,执行 put() 的线程被抢占,cleanupRunnable的线程被执行,它发现connections中没有任何RealConnection,于是从容地退出而导致后面添加的RealConnection永远不会得得清理。

这样的情况呢?答案是 不会。为什么呢?put()执行之前总是会用ConnectionPool对象锁来保护,而在ConnectionPool.cleanup()中,遍历connections也总是会先对ConnectionPool对象加锁保护的。即使执行 put() 的线程被抢占,cleanupRunnable的线程也会由于拿不到ConnectionPool对象锁而等待 put() 执行结束。

OkHttp内部的组件可以通过 get() 方法从ConnectionPool中获取RealConnection

  /** Returns a recycled connection to {@code address}, or null if no such connection exists. */
  RealConnection get(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.allocations.size() < connection.allocationLimit
          && address.equals(connection.route().address)
          && !connection.noNewStreams) {
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
  }

get() 方法遍历 connections 中的所有 RealConnection 寻找同时满足如下三个条件的RealConnection

  • RealConnection的allocations的数量小于allocationLimit。每个allocation代表在该RealConnection上正在执行的一个请求。这个条件用于控制相同连接上,同一时间执行的并发请求的个数。对于HTTP/2连接而言,allocationLimit限制是在连接建立阶段由双方协商的。对于HTTP或HTTPS连接而言,这个值则总是1。从RealConnection.establishProtocol()可以清晰地看到这一点:

      if (protocol == Protocol.HTTP_2) {
        socket.setSoTimeout(0); // Framed connection timeouts are set per-stream.
    
        Http2Connection http2Connection = new Http2Connection.Builder(true)
            .socket(socket, route.address().url().host(), source, sink)
            .listener(this)
            .build();
        http2Connection.start();
    
        // Only assign the framed connection once the preface has been sent successfully.
        this.allocationLimit = http2Connection.maxConcurrentStreams();
        this.http2Connection = http2Connection;
      } else {
        this.allocationLimit = 1;
      }
    
  • RealConnectionaddress 与传入的 Address 参数相等。RealConnectionaddress 描述建立连接所需的配置信息,包括对端的信息等,不难理解只有所有相关配置相等时 RealConnection 才是真正能复用的。具体看一下Address相等性比较的依据:
    @Override public boolean equals(Object other) {
     if (other instanceof Address) {
       Address that = (Address) other;
       return this.url.equals(that.url)
           && this.dns.equals(that.dns)
           && this.proxyAuthenticator.equals(that.proxyAuthenticator)
           && this.protocols.equals(that.protocols)
           && this.connectionSpecs.equals(that.connectionSpecs)
           && this.proxySelector.equals(that.proxySelector)
           && equal(this.proxy, that.proxy)
           && equal(this.sslSocketFactory, that.sslSocketFactory)
           && equal(this.hostnameVerifier, that.hostnameVerifier)
           && equal(this.certificatePinner, that.certificatePinner);
     }
     return false;
    }
    
    这种相等性的条件给人感觉还是蛮苛刻的,特别是对url的对比。 这难免会让我们有些担心,对 Address 如此苛刻的相等性比较,又有多大的机会能复用连接呢? 我们的担心其实是多余的。只有在 StreamAllocation.findConnection() 中,会通过Internal.instance 调用 ConnectionPool.get() 来获取 RealConnection
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
      if (pooledConnection != null) {
        this.connection = pooledConnection;
        return pooledConnection;
      }

      selectedRoute = route;
    }

    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
      synchronized (connectionPool) {
        route = selectedRoute;
        refusedStreamCount = 0;
      }
    }
    RealConnection newConnection = new RealConnection(selectedRoute);

    synchronized (connectionPool) {
      acquire(newConnection);
      Internal.instance.put(connectionPool, newConnection);
      this.connection = newConnection;
      if (canceled) throw new IOException("Canceled");
    }

Internal.instance的实现在OkHttpClient 中:

  static {
    Internal.instance = new Internal() {
      @Override public void addLenient(Headers.Builder builder, String line) {
        builder.addLenient(line);
      }

      @Override public void addLenient(Headers.Builder builder, String name, String value) {
        builder.addLenient(name, value);
      }

      @Override public void setCache(OkHttpClient.Builder builder, InternalCache internalCache) {
        builder.setInternalCache(internalCache);
      }

      @Override public boolean connectionBecameIdle(
          ConnectionPool pool, RealConnection connection) {
        return pool.connectionBecameIdle(connection);
      }

      @Override public RealConnection get(
          ConnectionPool pool, Address address, StreamAllocation streamAllocation) {
        return pool.get(address, streamAllocation);
      }

      @Override public void put(ConnectionPool pool, RealConnection connection) {
        pool.put(connection);
      }

      @Override public RouteDatabase routeDatabase(ConnectionPool connectionPool) {
        return connectionPool.routeDatabase;
      }

      @Override public StreamAllocation callEngineGetStreamAllocation(Call call) {
        return ((RealCall) call).streamAllocation();
      }

可见 ConnectionPool.get()Address 参数来自于StreamAllocationStreamAllocationAddress 在构造时由外部传入。构造了StreamAllocation对象的RetryAndFollowUpInterceptor,其构造Address的过程是这样的:

  private Address createAddress(HttpUrl url) {
    SSLSocketFactory sslSocketFactory = null;
    HostnameVerifier hostnameVerifier = null;
    CertificatePinner certificatePinner = null;
    if (url.isHttps()) {
      sslSocketFactory = client.sslSocketFactory();
      hostnameVerifier = client.hostnameVerifier();
      certificatePinner = client.certificatePinner();
    }

    return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
        sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
        client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
  }

Address 除了 uriHosturiPort 外的所有构造参数均来自于OkHttpClient,而Addressurl 字段正是根据这两个参数构造的:

  public Address(String uriHost, int uriPort, Dns dns, SocketFactory socketFactory,
      SSLSocketFactory sslSocketFactory, HostnameVerifier hostnameVerifier,
      CertificatePinner certificatePinner, Authenticator proxyAuthenticator, Proxy proxy,
      List<Protocol> protocols, List<ConnectionSpec> connectionSpecs, ProxySelector proxySelector) {
    this.url = new HttpUrl.Builder()
        .scheme(sslSocketFactory != null ? "https" : "http")
        .host(uriHost)
        .port(uriPort)
        .build();

可见 Addressurl 字段仅包含HTTP请求url的 schema + host + port 这三部分的信息,而不包含 path 和 query 等信息。ConnectionPool主要是根据服务器的地址来决定复用的。

  • RealConnection还有可分配的Stream。对于HTTP或HTTPS而言,不能同时在相同的连接上执行多个请求。即使对于HTTP/2而言,StreamID的空间也是有限的,同一个连接上的StreamID总有分配完的时候,而在StreamID被分配完了之后,该连接就不能再被使用了。

OkHttp内部对ConnectionPool的访问总是通过Internal.instance来进行。整个OkHttp中也只有StreamAllocation 存取了 ConnectionPool,也就是我们前面列出的StreamAllocation.findConnection() 方法,相关的组件之间的关系大体如下图:


相关阅读:

OkHttp3的连接池及连接建立过程分析(上篇)

OkHttp3的连接池及连接建立过程分析(中篇)

OkHttp3的连接池及连接建立过程分析(下篇)


网易云新用户大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者韩鹏飞授权发布。