Osheep

时光不回头,当下最重要。

OKhttp源码学习(九)—— 任务管理(Dispatcher)

源码地址:https://github.com/square/okhttp

针对具体一个请求的流程,前面已经做了学习分析,现在对OkHttp的请求任务管理进行分析学习。

使用过OkHttp的都知道,调用分为同步阻塞式的请求execute(),以及异步调用 enqueue(Callback responseCallback)
,同步请求没有什么好分析的,基本就是直接发起了请求。这里主要分析异步请求,是如何进行请求的管理和分配的。

主要的类:Dispatcher.
主要内容:

  1. 线程池
  2. 任务分发模型

1. 线程池

线程池,为解决的问题,很多资料都有具体的阐述,这里就引用一些专业的解释多线程:

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:
假设在一台服务器完成一项任务的时间为T

T1 创建线程的时间
T2 在线程中执行任务的时间,包括线程间同步所需时间
T3 线程销毁的时间
显然T = T1+T2+T3。注意这是一个极度简化的假设。
可以看出T1,T3是多线程本身的带来的开销(在Java中,通过映射pThead,并进一步通过SystemCall实现native线程),我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。

线程池,就是针对解决减少T1 和 T3的时间,提高服务的性能。

1.1 OkHttp的线程池

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

Dispatcher 通过单例创建了一个线程池,针对几个参数,可以发现,OkHttp的线程池具备特点:

  1. 线程数区间[0,Integer.MAX_VALUE],不保留最少线程数,随时创建更多线程 ;
  2. 当线程空闲的时候,最多保活时间为60s;
  3. 使用一个同步队列作为工作队列,先进先出;
  4. 创建一个名为“OkHttp Dispatcher” 的线程工厂 ThreadFactory 。

线程池的处理,就到这里,下面就开始,分析OkHttp是如何使用这个线程池来进行请求任务的调度和分配的。

2. 任务分发模型

在我们发起一个异步请求的时候,其实是交给了Dispatcher来处理的
RealCall.java

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    //处理再这里
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

在Dispatcher中:

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // 添加到runningAsyncCalls队列中
      runningAsyncCalls.add(call);
      //线程池的调用
      executorService().execute(call);
    } else {
      //添加到准备中的队列中
      readyAsyncCalls.add(call);
    }
  }

上面的代码中几个重要的变量:

  1. runningAsyncCalls 储存运行中的异步请求队列
  2. readyAsyncCalls 储存准备中的异步请求队列
  3. maxRequests 最大请求数量(64个)
  4. maxRequestsPerHost 相同Host最大请求数量(5)

当如果运行中的请求少用64个以及相同 Host的请求小于5个的时候,直接添加到runningAsyncCalls并且调用线程池来执行这个AsyncCall,交给线程池去调度。 如果已经超出了这个限制,就把这个请求添加到 readyAsyncCalls,等待调用。但是这个准备中的队列是什么时候被调用的呢?

撸一下代码,发现在AsyncCall的execute方法里面。也就是这个异步线程的执行方法里面:

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        //执行真正的请求
        Response response = getResponseWithInterceptorChain();

        //通过CallBack 回调给用户
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        //重点在这里
        client.dispatcher().finished(this);
      }
    }

最后的时候调用了Dispatcher的finished,继续向下撸:

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //重点在这里
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

这里会调用一个 promoteCalls()方法:

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
       // 加入到运行中的队列,并执行。
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

终于找到了,在请求完成的时候,调用Dispatcher的finished同时,会检查这个时候准备中的请求,是否有可以添加到运行请求中线程池中去的。

3. 总结

整个任务管理的流程,其实也不复杂:

  1. 通过两个请求队列,来管理请求数量以及准备中的请求的数量;
  2. 请求交一个线程池来完成。

最后用一个图的形式总结一下,OkHttp的请求任务管理的实现:

《OKhttp源码学习(九)—— 任务管理(Dispatcher)》

分配管理

系列:
OKhttp源码学习(一)—— 基本请求流程
OKhttp源码学习(二)—— OkHttpClient
OKhttp源码学习(三)—— Request, RealCall
OKhttp源码学习(四)—— RetryAndFollowUpInterceptor拦截器
OKhttp源码学习(五)—— BridgeInterceptor拦截器
OKhttp源码学习(六)—— CacheInterceptor拦截器
OKhttp源码学习(七)—— ConnectInterceptor拦截器
OKhttp源码学习(八)—— CallServerInterceptor拦截器

点赞