1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
   14
   15
   16
   17
   18
   19
   20
   21
   22
   23
   24
   25
   26
   27
   28
   29
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   60
   61
   62
   63
   64
   65
   66
   67
   68
   69
   70
   71
   72
   73
   74
   75
   76
   77
   78
   79
   80
   81
   82
   83
   84
   85
   86
   87
   88
   89
   90
   91
   92
   93
   94
   95
   96
   97
   98
   99
  100
  101
  102
  103
  104
  105
  106
  107
  108
  109
  110
  111
  112
  113
  114
  115
  116
  117
  118
  119
  120
  121
  122
  123
  124
  125
  126
  127
  128
  129
  130
  131
  132
  133
  134
  135
  136
  137
  138
  139
  140
  141
  142
  143
  144
  145
  146
  147
  148
  149
  150
  151
  152
  153
  154
  155
  156
  157
  158
  159
  160
  161
  162
  163
  164
  165
  166
  167
  168
  169
  170
  171
  172
  173
  174
  175
  176
  177
  178
  179
  180

cc / raster / categorized_worker_pool.h [blame]

// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef CC_RASTER_CATEGORIZED_WORKER_POOL_H_
#define CC_RASTER_CATEGORIZED_WORKER_POOL_H_

#include <memory>
#include <optional>

#include "base/containers/span.h"
#include "base/functional/callback.h"
#include "base/memory/raw_ptr.h"
#include "base/synchronization/condition_variable.h"
#include "base/task/post_job.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/task_runner.h"
#include "base/thread_annotations.h"
#include "base/threading/platform_thread.h"
#include "cc/cc_export.h"
#include "cc/raster/task_category.h"
#include "cc/raster/task_graph_runner.h"
#include "cc/raster/task_graph_work_queue.h"

namespace cc {

// A pool of threads used to run categorized work. The work can be scheduled on
// the threads using different interfaces.
// 1. The pool itself implements TaskRunner interface and tasks posted via that
//    interface might run in parallel.
// 2. The pool also implements TaskGraphRunner interface which allows to
//    schedule a graph of tasks with their dependencies.
// 3. CreateSequencedTaskRunner() creates a sequenced task runner that might run
//    in parallel with other instances of sequenced task runners.
class CC_EXPORT CategorizedWorkerPool : public base::TaskRunner,
                                        public TaskGraphRunner {
 public:
  class CC_EXPORT Delegate {
   public:
    virtual ~Delegate() = default;

    // Called on the delegate with a worker pool thread ID as soon as the
    // thread is created.
    virtual void NotifyThreadWillRun(base::PlatformThreadId tid) = 0;
  };

  CategorizedWorkerPool();

  // Get or create the singleton worker pool. This object lives forever. If
  // `delegate` is non-null, it must also live forever.
  static CategorizedWorkerPool* GetOrCreate(Delegate* delegate = nullptr);

  // Overridden from TaskGraphRunner:
  NamespaceToken GenerateNamespaceToken() override;
  void WaitForTasksToFinishRunning(NamespaceToken token) override;
  void CollectCompletedTasks(NamespaceToken token,
                             Task::Vector* completed_tasks) override;
  void RunTasksUntilIdleForTest() override;

  virtual void FlushForTesting() = 0;

  virtual void Start(int max_concurrency_foreground) = 0;

  // Finish running all the posted tasks (and nested task posted by those tasks)
  // of all the associated task runners.
  // Once all the tasks are executed the method blocks until the threads are
  // terminated.
  virtual void Shutdown() = 0;

  TaskGraphRunner* GetTaskGraphRunner() { return this; }

  // Create a new sequenced task graph runner.
  scoped_refptr<base::SequencedTaskRunner> CreateSequencedTaskRunner();

 protected:
  class CategorizedWorkerPoolSequencedTaskRunner;
  friend class CategorizedWorkerPoolSequencedTaskRunner;

  ~CategorizedWorkerPool() override;

  // Simple Task for the TaskGraphRunner that wraps a closure.
  // This class is used to schedule TaskRunner tasks on the
  // |task_graph_runner_|.
  class ClosureTask : public Task {
   public:
    explicit ClosureTask(base::OnceClosure closure);

    ClosureTask(const ClosureTask&) = delete;
    ClosureTask& operator=(const ClosureTask&) = delete;

    // Overridden from Task:
    void RunOnWorkerThread() override;

   protected:
    ~ClosureTask() override;

   private:
    base::OnceClosure closure_;
  };

  void CollectCompletedTasksWithLockAcquired(NamespaceToken token,
                                             Task::Vector* completed_tasks)
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Determines if we should run a new task for the given category. This factors
  // in whether a task is available and whether the count of running tasks is
  // low enough to start a new one.
  bool ShouldRunTaskForCategoryWithLockAcquired(TaskCategory category)
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Lock to exclusively access all the following members that are used to
  // implement the TaskRunner and TaskGraphRunner interfaces.
  mutable base::Lock lock_;
  // Stores the tasks to be run, sorted by priority.
  TaskGraphWorkQueue work_queue_ GUARDED_BY(lock_);
  // Namespace used to schedule tasks in the task graph runner.
  const NamespaceToken namespace_token_;
  // List of tasks currently queued up for execution.
  Task::Vector tasks_ GUARDED_BY(lock_);
  // Graph object used for scheduling tasks.
  TaskGraph graph_ GUARDED_BY(lock_);
  // Cached vector to avoid allocation when getting the list of complete
  // tasks.
  Task::Vector completed_tasks_ GUARDED_BY(lock_);
  // Condition variable that is waited on by origin threads until a namespace
  // has finished running all associated tasks.
  base::ConditionVariable has_namespaces_with_finished_running_tasks_cv_;
  // Condition variable signalled when there are no more tasks ready to run.
  base::ConditionVariable workers_are_idle_cv_;
};

class CC_EXPORT CategorizedWorkerPoolJob : public CategorizedWorkerPool {
 public:
  CategorizedWorkerPoolJob();

  // Overridden from base::TaskRunner:
  bool PostDelayedTask(const base::Location& from_here,
                       base::OnceClosure task,
                       base::TimeDelta delay) override;

  // Overridden from TaskGraphRunner:
  void ScheduleTasks(NamespaceToken token, TaskGraph* graph) override;
  void ExternalDependencyCompletedForTask(NamespaceToken token,
                                          scoped_refptr<Task> task) override;

  // Runs a task from one of the provided categories. Categories listed first
  // have higher priority.
  void Run(base::span<const TaskCategory> categories,
           base::JobDelegate* job_delegate);

  // Overridden from CategorizedWorkerPool:
  void FlushForTesting() override;
  void Start(int max_concurrency_foreground) override;
  void Shutdown() override;

 private:
  ~CategorizedWorkerPoolJob() override;

  std::optional<TaskGraphWorkQueue::PrioritizedTask>
  GetNextTaskToRunWithLockAcquired(base::span<const TaskCategory> categories);

  base::JobHandle* ScheduleTasksWithLockAcquired(NamespaceToken token,
                                                 TaskGraph* graph)
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Helper function which signals worker threads if tasks are ready to run.
  base::JobHandle* GetJobHandleToNotifyWithLockAcquired()
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  size_t GetMaxJobConcurrency(base::span<const TaskCategory> categories) const;

  size_t max_concurrency_foreground_ = 0;

  base::JobHandle background_job_handle_;
  base::JobHandle foreground_job_handle_;
};

}  // namespace cc

#endif  // CC_RASTER_CATEGORIZED_WORKER_POOL_H_