OkHttp3的连接池及连接建立过程分析(中篇)

达芬奇密码2018-08-17 10:14

RealConnection的清理


ConnectionPool 中对于 RealConnection 的清理在put()方法中触发,执行 cleanupRunnable 来完成清理动作:


  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

cleanupRunnable每执行一次清理动作,都会等待一段时间再次执行,而具体等待的时长由cleanup()方法决定,直到cleanup()方法返回-1退出。cleanup()方法定义如下:


  /**
   * Performs maintenance on this pool, evicting the connection that has been idle the longest if
   * either it has exceeded the keep alive limit or the idle connections limit.
   *
   * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
   * -1 if no further cleanups are required.
   */
  long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
  }

  /**
   * Prunes any leaked allocations and then returns the number of remaining live allocations on
   * {@code connection}. Allocations are leaked if the connection is tracking them but the
   * application code has abandoned them. Leak detection is imprecise and relies on garbage
   * collection.
   */
  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }

cleanup()方法遍历connections,并从中找到处于空闲状态时间最长的一个RealConnection,然后根据查找结果的不同,分为以下几种情况处理:

  • 找到一个处于空闲状态的RealConnection,且该RealConnection处于空闲状态的时间超出了设置的保活时间,或者当前ConnectionPool中处于空闲状态的连接数超出了设置的最大空闲连接数,将该RealConnectionconnections中移除,并关闭该RealConnection关联的底层socket,然后返回0,以此请求cleanupRunnable立即再次检查所有的连接。
  • 找到一个处于空闲状态的RealConnection,但该RealConnection处于空闲状态的时间尚未超出设置的保活时间,且当前ConnectionPool中处于空闲状态的连接数尚未超出设置的最大空闲连接数,则返回保活时间与该RealConnection处于空闲状态的时间之间的差值,请求cleanupRunnable等待这么长一段时间之后再次检查所有的连接。
  • 没有找到处于空闲状态的连接,但找到了使用中的连接,则返回保活时间,请求cleanupRunnable等待这么长一段时间之后再次检查所有的连接。
  • 没有找到处于空闲状态的连接,也没有找到使用中的连接,也就意味着连接池中尚没有任何连接,则将 cleanupRunning 置为false,并返回 -1,请求 cleanupRunnable 退出。

cleanup() 通过 pruneAndGetAllocationCount() 检查正在使用一个特定连接的请求个数,并以此来判断一个连接是否处于空闲状态。后者通遍历 connection.allocations 并检查每个元素的StreamAllocation 的状态,若StreamAllocation 为空,则认为是发现了一个leak,它会更新连接的空闲时间为当前时间减去保活时间并返回0,以此请求 cleanup() 立即关闭、清理掉该 leak 的连接。

ConnectionPool的用户接口

OkHttp的用户可以自己创建 ConnectionPool 对象,这个类也提供了一些用户接口以方便用户获取空闲状态的连接数、总的连接数,以及手动清除空闲状态的连接:

  /** Returns the number of idle connections in the pool. */
  public synchronized int idleConnectionCount() {
    int total = 0;
    for (RealConnection connection : connections) {
      if (connection.allocations.isEmpty()) total++;
    }
    return total;
  }

  /**
   * Returns total number of connections in the pool. Note that prior to OkHttp 2.7 this included
   * only idle connections and HTTP/2 connections. Since OkHttp 2.7 this includes all connections,
   * both active and inactive. Use {@link #idleConnectionCount()} to count connections not currently
   * in use.
   */
  public synchronized int connectionCount() {
    return connections.size();
  }

......

  /** Close and remove all idle connections in the pool. */
  public void evictAll() {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.allocations.isEmpty()) {
          connection.noNewStreams = true;
          evictedConnections.add(connection);
          i.remove();
        }
      }
    }

    for (RealConnection connection : evictedConnections) {
      closeQuietly(connection.socket());
    }
  }

新建流

回到新建流的过程,连接建立的各种细节处理都在这里。 StreamAllocation.newStream() 完成新建流的动作:

  public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);

      HttpCodec resultCodec;
      if (resultConnection.http2Connection != null) {
        resultCodec = new Http2Codec(client, this, resultConnection.http2Connection);
      } else {
        resultConnection.socket().setSoTimeout(readTimeout);
        resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
        resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
        resultCodec = new Http1Codec(
            client, this, resultConnection.source, resultConnection.sink);
      }

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

所谓的流,是封装了底层的IO,可以直接用来收发数据的组件,它会将请求的数据序列化之后发送到网络,并将接收的数据反序列化为应用程序方便操作的格式。在 OkHttp3 中,这样的组件被抽象为HttpCodecHttpCodec的定义如下 (okhttp/okhttp/src/main/java/okhttp3/internal/http/HttpCodec.java):

/** Encodes HTTP requests and decodes HTTP responses. */
public interface HttpCodec {
  /**
   * The timeout to use while discarding a stream of input data. Since this is used for connection
   * reuse, this timeout should be significantly less than the time it takes to establish a new
   * connection.
   */
  int DISCARD_STREAM_TIMEOUT_MILLIS = 100;

  /** Returns an output stream where the request body can be streamed. */
  Sink createRequestBody(Request request, long contentLength);

  /** This should update the HTTP engine's sentRequestMillis field. */
  void writeRequestHeaders(Request request) throws IOException;

  /** Flush the request to the underlying socket. */
  void finishRequest() throws IOException;

  /** Read and return response headers. */
  Response.Builder readResponseHeaders() throws IOException;

  /** Returns a stream that reads the response body. */
  ResponseBody openResponseBody(Response response) throws IOException;

  /**
   * Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously.
   * That may happen later by the connection pool thread.
   */
  void cancel();
}

HttpCodec提供了这样的一些操作:

  • 为发送请求而提供的,写入请求头部。
  • 为发送请求而提供的,创建请求体,以用于发送请求体数据。
  • 为发送请求而提供的,结束请求发送。
  • 为获得响应而提供的,读取响应头部。
  • 为获得响应而提供的,打开请求体,以用于后续获取请求体数据。
  • 取消请求执行。

StreamAllocation.newStream() 主要做的事情正是创建HttpCodecStreamAllocation.newStream() 根据 OkHttpClient中的设置,连接超时、读超时、写超时及连接失败是否重试,调用 findHealthyConnection() 完成 连接,即RealConnection 的创建。然后根据HTTP协议的版本创建Http1Codec或Http2Codec。

findHealthyConnection() 根据目标服务器地址查找一个连接,如果它是可用的就直接返回,如果不可用则会重复查找直到找到一个可用的为止。在连接已被破坏而不可用时,还会释放连接:

  /**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }

连接是否可用的标准如下 (okhttp/okhttp/src/main/java/okhttp3/internal/connection/RealConnection.java):

  /** Returns true if this connection is ready to host new streams. */
  public boolean isHealthy(boolean doExtensiveChecks) {
    if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
      return false;
    }

    if (http2Connection != null) {
      return true; // TODO: check framedConnection.shutdown.
    }

    if (doExtensiveChecks) {
      try {
        int readTimeout = socket.getSoTimeout();
        try {
          socket.setSoTimeout(1);
          if (source.exhausted()) {
            return false; // Stream is exhausted; socket is closed.
          }
          return true;
        } finally {
          socket.setSoTimeout(readTimeout);
        }
      } catch (SocketTimeoutException ignored) {
        // Read timed out; socket is good.
      } catch (IOException e) {
        return false; // Couldn't read; socket is closed.
      }
    }

    return true;
  }

首先要可以进行IO,此外对于HTTP/2,只要http2Connection存在即可。如我们前面在ConnectInterceptor 中看到的,如果HTTP请求的method不是 "GET" ,doExtensiveChecks为true时,需要做额外的检查。

findHealthyConnection() 通过 findConnection()查找一个连接:

  /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
      if (pooledConnection != null) {
        this.connection = pooledConnection;
        return pooledConnection;
      }

      selectedRoute = route;
    }

    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
      synchronized (connectionPool) {
        route = selectedRoute;
        refusedStreamCount = 0;
      }
    }
    RealConnection newConnection = new RealConnection(selectedRoute);

    synchronized (connectionPool) {
      acquire(newConnection);
      Internal.instance.put(connectionPool, newConnection);
      this.connection = newConnection;
      if (canceled) throw new IOException("Canceled");
    }

    newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
        connectionRetryEnabled);
    routeDatabase().connected(newConnection.route());

    return newConnection;
  }

findConnection() 返回一个用于流执行底层IO的连接。这个方法优先复用已经创建的连接;在没有可复用连接的情况下新建一个。

在同一次 newStream() 的执行过程中,有没有可能两次执行 findConnection() ,第一次connection 字段为空,第二次不为空?这个地方对connection字段的检查,看起来有点多余。执行 findConnection() 时,connection 不为空的话,意味着 codec 不为空,而在方法的开始处已经有对codec字段的状态做过检查。真的是这样的吗?

答案当然是否定的。同一次 newStream() 的执行过程中,没有可能两次执行findConnection(),第一次connection字段为空,第二次不为空,然而一个HTTP请求的执行过程,又不是一定只调用一次newStream()

newStream()的直接调用者是ConnectInterceptor,所有的Interceptor用RealInterceptorChain链起来,在Interceptor链中,ConnectInterceptorRetryAndFollowUpInterceptor 隔着 CacheInterceptorBridgeInterceptor 。然而newStream() 如果出错的话,则是会通过抛出Exception返回到RetryAndFollowUpInterceptor 来处理错误的。

RetryAndFollowUpInterceptor 中会尝试基于相同的 StreamAllocation 对象来恢复对HTTP请求的处理。RetryAndFollowUpInterceptor 通过 hasMoreRoutes() 等方法,来检查StreamAllocation 对象的状态,通过 streamFailed(IOException e)release()streamFinished(boolean noNewStreams, HttpCodec codec)等方法来reset StreamAllocation对象的一些状态。

回到StreamAllocationfindConnection()方法。没有连接存在,且连接池中也没有找到所需的连接时,它会新建一个连接。通过如下的步骤新建连接:

  • 为连接选择一个Route
  • 新建一个RealConnection对象。
    public RealConnection(Route route) {
      this.route = route;
    }
    
  • 将当前StreamAllocation对象的引用保存进RealConnection的allocations。如我们前面在分析ConnectionPool时所见的那样,这主要是为了追踪RealConnection
    /**
     * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
     * {@link #release} on the same connection.
     */
    public void acquire(RealConnection connection) {
      assert (Thread.holdsLock(connectionPool));
      connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
    }
    
  • RealConnection保存进连接池。
  • 保存对RealConnection的引用。
  • 检查请求是否被取消,若取消,则抛出异常。
  • 建立连接。
  • 更新RouteDatabase中Route的状态。

ConnectionSpec

在OkHttp中,ConnectionSpec用于描述传输HTTP流量的socket连接的配置。对于https请求,这些配置主要包括协商安全连接时要使用的TLS版本号和密码套件,是否支持TLS扩展等;对于http请求则几乎不包含什么信息。

OkHttp有预定义几组ConnectionSpec (okhttp/okhttp/src/main/java/okhttp3/ConnectionSpec.java):

  /** A modern TLS connection with extensions like SNI and ALPN available. */
  public static final ConnectionSpec MODERN_TLS = new Builder(true)
      .cipherSuites(APPROVED_CIPHER_SUITES)
      .tlsVersions(TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
      .supportsTlsExtensions(true)
      .build();

  /** A backwards-compatible fallback connection for interop with obsolete servers. */
  public static final ConnectionSpec COMPATIBLE_TLS = new Builder(MODERN_TLS)
      .tlsVersions(TlsVersion.TLS_1_0)
      .supportsTlsExtensions(true)
      .build();

  /** Unencrypted, unauthenticated connections for {@code http:} URLs. */
  public static final ConnectionSpec CLEARTEXT = new Builder(false).build();

预定义的这些ConnectionSpec被组织为默认ConnectionSpec集合 (okhttp/okhttp/src/main/java/okhttp3/OkHttpClient.java):

public class OkHttpClient implements Cloneable, Call.Factory {
  private static final List<Protocol> DEFAULT_PROTOCOLS = Util.immutableList(
      Protocol.HTTP_2, Protocol.HTTP_1_1);

  private static final List<ConnectionSpec> DEFAULT_CONNECTION_SPECS = Util.immutableList(
      ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS, ConnectionSpec.CLEARTEXT);

OkHttp中由OkHttpClient管理ConnectionSpec集合 。OkHttp的用户可以在构造OkHttpClient的过程中提供自己的ConnectionSpec集合。默认情况下OkHttpClient会使用前面看到的默认ConnectionSpec集合。

RetryAndFollowUpInterceptor中创建Address时,ConnectionSpec集合被从OkHttpClient获取,并由Address引用。

OkHttp还提供了ConnectionSpecSelector,用以从ConnectionSpec集合中选择与SSLSocket匹配的ConnectionSpec,并对SSLSocket做配置的操作。

StreamAllocation的findConnection()中,ConnectionSpec集合被从Address中取出来,以用于连接建立过程。

建立连接

回到连接建立的过程。RealConnection.connect()执行连接建立的过程(okhttp/okhttp/src/main/java/okhttp3/internal/connection/RealConnection.java):

  public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      List<ConnectionSpec> connectionSpecs, boolean connectionRetryEnabled) {
    if (protocol != null) throw new IllegalStateException("already connected");

    RouteException routeException = null;
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      }
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
      }
    }

    while (protocol == null) {
      try {
        if (route.requiresTunnel()) {
          buildTunneledConnection(connectTimeout, readTimeout, writeTimeout,
              connectionSpecSelector);
        } else {
          buildConnection(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector);
        }
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }
  }

这里的执行过程大体如下:

  • 检查连接是否已经建立,若已经建立,则抛出异常,否则继续执行。连接是否建立由protocol 标识,它表示在整个连接建立,及可能的协议协商过程中选择的所要使用的协议。
  • 根据ConnectionSpec集合connectionSpecs构造ConnectionSpecSelector
  • 若请求不是安全的请求,会对请求再执行一些额外的限制。这些限制包括:
    • ConnectionSpec集合中必须要包含ConnectionSpec.CLEARTEXT。这也就是说,OkHttp的用户可以通过为OkHttpClient设置不包含ConnectionSpec.CLEARTEXTConnectionSpec集合来禁用所有的明文请求。
    • 平台本身的安全策略允许向相应的主机发送明文请求。对于Android平台而言,这种安全策略主要由系统的组件android.security.NetworkSecurityPolicy执行 (okhttp/okhttp/src/main/java/okhttp3/internal/platform/AndroidPlatform.java):
      @Override public boolean isCleartextTrafficPermitted(String hostname) {
      try {
        Class<?> networkPolicyClass = Class.forName("android.security.NetworkSecurityPolicy");
        Method getInstanceMethod = networkPolicyClass.getMethod("getInstance");
        Object networkSecurityPolicy = getInstanceMethod.invoke(null);
        Method isCleartextTrafficPermittedMethod = networkPolicyClass
            .getMethod("isCleartextTrafficPermitted", String.class);
        return (boolean) isCleartextTrafficPermittedMethod.invoke(networkSecurityPolicy, hostname);
      } catch (ClassNotFoundException | NoSuchMethodException e) {
        return super.isCleartextTrafficPermitted(hostname);
      } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
        throw new AssertionError();
      }
      }
      
      平台的这种安全策略并不是每个Android版本都有的。Android 6.0之后存在这种控制。
  • 根据请求是否需要建立隧道连接,而分别执行buildTunneledConnection()buildConnection()。是否需要建立隧道连接的依据为 (okhttp/okhttp/src/main/java/okhttp3/Route.java):
    /**
     * Returns true if this route tunnels HTTPS through an HTTP proxy. See <a
     * href="http://www.ietf.org/rfc/rfc2817.txt">RFC 2817, Section 5.2</a>.
     */
    public boolean requiresTunnel() {
      return address.sslSocketFactory != null && proxy.type() == Proxy.Type.HTTP;
    }
    
    即对于设置了HTTP代理,且安全的连接 (SSL) 需要请求代理服务器建立一个到目标HTTP服务器的隧道连接,客户端与HTTP代理建立TCP连接,以此请求HTTP代理服务在客户端与HTTP服务器之间进行数据的盲转发。



网易云新用户大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者韩鹏飞授权发布。